Я пытаюсь записать DataSet в базу данных Hive с помощью Spark Java, но в процессе я получаю исключение.
Это мой код:
Dataset<Row> data = spark.read().json(rdd).select("event.event_name");
data.write().mode("overwrite").saveAsTable("telecom.t2");
Здесь rdd
— это потоковые данные json, и я могу распечатать результат data
с помощью следующей команды.
data.show();
Но когда я пытаюсь записать этот результат в базу данных Hive, я не получаю никаких исключений, но я получаю исключение в Hive command line
, когда пытаюсь напечатать эти значения. Например:
select * from telecom.t2;
И исключение:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:317)
at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:219)
at org.xerial.snappy.Snappy.<clinit>(Snappy.java:44)
at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
at parquet.column.Encoding$1.initDictionary(Encoding.java:89)
at parquet.column.Encoding$4.initDictionary(Encoding.java:148)
at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337)
at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66)
at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61)
at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:673)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:323)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:445)
at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:414)
at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:140)
at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1670)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:165)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:736)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52)
... 48 more
Exception in thread "main" org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null
at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:229)
at org.xerial.snappy.Snappy.<clinit>(Snappy.java:44)
at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
at parquet.column.Encoding$1.initDictionary(Encoding.java:89)
at parquet.column.Encoding$4.initDictionary(Encoding.java:148)
at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337)
at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66)
at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61)
at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:208)
at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:122)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:85)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:72)
at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:673)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:323)
at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:445)
at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:414)
at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:140)
at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1670)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:165)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:736)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
2 Jan, 2017 12:02:40 PM WARNING: parquet.hadoop.ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
2 Jan, 2017 12:02:40 PM INFO: parquet.hadoop.InternalParquetRecordReader: RecordReader initialized will read a total of 12 records.
2 Jan, 2017 12:02:40 PM INFO: parquet.hadoop.InternalParquetRecordReader: at row 0. reading next block
2 Jan, 2017 12:02:40 PM INFO: parquet.hadoop.InternalParquetRecordReader: block read in memory in 29 ms. row count = 12
t2
(event_name
string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' С SERDEPROPERTIES ('path'='hdfs://localhost:8020/user/hive/warehouse /telecom.db/t2') ХРАНИТСЯ КАК ВХОДНОЙ ФОРМАТ 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' РАСПОЛОЖЕНИЕ 'hdfs: //локальный:8020/пользователь/улей/склад/telecom.db/t2' 02.01.2017format
, или формат принудительно паркет:.format("parquet")
02.01.2017