Arhn - архитектура программирования

Получение исключения при записи набора данных в Hive

Я пытаюсь записать DataSet в базу данных Hive с помощью Spark Java, но в процессе я получаю исключение.

Это мой код:

 Dataset<Row> data = spark.read().json(rdd).select("event.event_name");
 data.write().mode("overwrite").saveAsTable("telecom.t2");

Здесь rdd — это потоковые данные json, и я могу распечатать результат data с помощью следующей команды.

data.show();

Но когда я пытаюсь записать этот результат в базу данных Hive, я не получаю никаких исключений, но я получаю исключение в Hive command line, когда пытаюсь напечатать эти значения. Например:

select * from telecom.t2;

И исключение:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:317)
    at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:219)
    at org.xerial.snappy.Snappy.<clinit>(Snappy.java:44)
    at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
    at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
    at java.io.DataInputStream.readFully(DataInputStream.java:195)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
    at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
    at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
    at parquet.column.Encoding$1.initDictionary(Encoding.java:89)
    at parquet.column.Encoding$4.initDictionary(Encoding.java:148)
    at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337)
    at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66)
    at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61)
    at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
    at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
    at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
    at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
    at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
    at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
    at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:673)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:323)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:445)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:414)
    at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:140)
    at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1670)
    at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
    at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:165)
    at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
    at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:736)
    at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
    at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path
    at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
    at java.lang.Runtime.loadLibrary0(Runtime.java:870)
    at java.lang.System.loadLibrary(System.java:1122)
    at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)
    ... 48 more
Exception in thread "main" org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null
    at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:229)
    at org.xerial.snappy.Snappy.<clinit>(Snappy.java:44)
    at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
    at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
    at java.io.DataInputStream.readFully(DataInputStream.java:195)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
    at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
    at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
    at parquet.column.Encoding$1.initDictionary(Encoding.java:89)
    at parquet.column.Encoding$4.initDictionary(Encoding.java:148)
    at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337)
    at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66)
    at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61)
    at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
    at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
    at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
    at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
    at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
    at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
    at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122)
    at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
    at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
    at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:673)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:323)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:445)
    at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:414)
    at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:140)
    at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1670)
    at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
    at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:165)
    at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
    at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:736)
    at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
    at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
2 Jan, 2017 12:02:40 PM WARNING: parquet.hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
2 Jan, 2017 12:02:40 PM INFO: parquet.hadoop.InternalParquetRecordReader: RecordReader initialized will read a total of 12 records.
2 Jan, 2017 12:02:40 PM INFO: parquet.hadoop.InternalParquetRecordReader: at row 0. reading next block
2 Jan, 2017 12:02:40 PM INFO: parquet.hadoop.InternalParquetRecordReader: block read in memory in 29 ms. row count = 12

Ответы:


1

Spark сохраняет данные в формате parquet.snappy по умолчанию, когда вы вызываете saveAsTable, и кажется, что у вас нет быстрого пути к библиотеке куста. Изменение формата записи (например, на json) не будет работать, поскольку Hive ожидает, что файлы последовательности будут созданы в таблице с использованием этого параметра.

Но вы можете изменить алгоритм сжатия перед сохранением данных в виде таблицы:

spark.conf.set("spark.sql.parquet.compression.codec", "gzip")

Сжатие Gzip должно быть доступно в Hive по умолчанию, в случае каких-либо проблем вы все равно можете сохранить данные без сжатия:

spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
02.01.2017
  • Спасибо за ответ @Mariusz, когда я попытался, как вы сказали, я получил следующее исключение в улье: * Ошибка с исключением java.io.IOException: java.io.IOException: hdfs://localhost:8020/user/hive/warehouse /telecom.db/t2/part-r-00000-0bd1c641-0e4d-4a8c-98a3-b8d31ecd4172.json не является файлом последовательности* 02.01.2017
  • CREATE TABLE t2( event_name string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' С SERDEPROPERTIES ('path'='hdfs://localhost:8020/user/hive/warehouse /telecom.db/t2') ХРАНИТСЯ КАК ВХОДНОЙ ФОРМАТ 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' РАСПОЛОЖЕНИЕ 'hdfs: //локальный:8020/пользователь/улей/склад/telecom.db/t2' 02.01.2017
  • TBLPROPERTIES ('COLUMN_STATS_ACCURATE'='false', 'EXTERNAL'='FALSE', 'numFiles'='1', 'numRows'='-1', 'rawDataSize'='-1', 'spark.sql.sources .provider'='parquet', 'spark.sql.sources.schema.numParts'='1', 'spark.sql.sources.schema.part.0'='{\type\:\struct\,\fields \:[{\name\:\event_name\,\type\:\string\,\nullable\:true,\metadata\:{}}]}', 'totalSize'='1134', 'transient_lastDdlTime'=' 1483342731') Затрачено времени: 0,578 секунды, получено: 23 строки 02.01.2017
  • Вы правы, у Hive есть некоторые проблемы с чтением таблиц с измененным форматом. Попробуйте изменить метод сжатия с snappy на gzip — см. мой обновленный ответ. 02.01.2017
  • получение такого же исключения 02.01.2017
  • Убедитесь, что вы не указали format, или формат принудительно паркет: .format("parquet") 02.01.2017

  • Новые материалы

    Коллекции публикаций по глубокому обучению
    Последние пару месяцев я создавал коллекции последних академических публикаций по различным подполям глубокого обучения в моем блоге https://amundtveit.com - эта публикация дает обзор 25..

    Представляем: Pepita
    Фреймворк JavaScript с открытым исходным кодом Я знаю, что недостатка в фреймворках JavaScript нет. Но я просто не мог остановиться. Я хотел написать что-то сам, со своими собственными..

    Советы по коду Laravel #2
    1-) Найти // You can specify the columns you need // in when you use the find method on a model User::find(‘id’, [‘email’,’name’]); // You can increment or decrement // a field in..

    Работа с временными рядами спутниковых изображений, часть 3 (аналитика данных)
    Анализ временных рядов спутниковых изображений для данных наблюдений за большой Землей (arXiv) Автор: Рольф Симоэс , Жильберто Камара , Жильберто Кейрос , Фелипе Соуза , Педро Р. Андраде ,..

    3 способа решить квадратное уравнение (3-й мой любимый) -
    1. Методом факторизации — 2. Используя квадратичную формулу — 3. Заполнив квадрат — Давайте поймем это, решив это простое уравнение: Мы пытаемся сделать LHS,..

    Создание VR-миров с A-Frame
    Виртуальная реальность (и дополненная реальность) стали главными модными терминами в образовательных технологиях. С недорогими VR-гарнитурами, такими как Google Cardboard , и использованием..

    Демистификация рекурсии
    КОДЕКС Демистификация рекурсии Упрощенная концепция ошеломляющей О чем весь этот шум? Рекурсия, кажется, единственная тема, от которой у каждого начинающего студента-информатика..