私はScalaで書かれたSparkプログラムを使用して、HDFSからCSVファイルを読み込み、新しい列を計算して寄せ木ファイルとして保存します。私はYARNクラスターでプログラムを実行しています。しかし、私はそれを起動しようとするたびに、エグゼキュータはこのエラーのある時点で失敗します。YARNモードでスパークジョブが失敗する
このエラーの原因を見つけるお手伝いをしてください。エグゼキュータ上から
はログイン
16/10/27 15:58:10 WARN storage.BlockManager: Putting block rdd_12_225 failed due to an exception
16/10/27 15:58:10 WARN storage.BlockManager: Block rdd_12_225 could not be removed as it was not found on disk or in memory
16/10/27 15:58:10 ERROR executor.Executor: Exception in task 225.0 in stage 4.0 (TID 465)
java.io.IOException: Stream is corrupted
at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.readSize(UnsafeRowSerializer.scala:113)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.<init>(UnsafeRowSerializer.scala:120)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3.asKeyValueIterator(UnsafeRowSerializer.scala:110)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:66)
at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$3.apply(BlockStoreShuffleReader.scala:62)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:118)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:110)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 15385 of input buffer
at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
... 41 more
EDIT:
コードがユーザー関数A2Pはちょうど2つのダブルを取り、他の二重
を返されvar df = spark.read.option("header", "true").option("inferSchema", "true").option("treatEmptyValuesAsNulls", "true").csv(hdfsFileURLIn).repartition(nPartitions)
df.printSchema()
df = df.withColumn("ipix", a2p(df.col(deName), df.col(raName))).persist(StorageLevel.MEMORY_AND_DISK)
df.repartition(nPartitions, $"ipix").write.mode("overwrite").option("spark.hadoop.dfs.replication", 1).parquet(hdfsFileURLOut)
使用があります
私はこれが比較的小さなCSV(〜1Go)ではうまくいきましたが、th私は再分割を無効にして、私は** _私は置くブロックRDDを得ることはありませんこれでStorageLevel.DISK_ONLY
を使用提案後 :エラーが大きなもので、すべての回起こる(〜15Go)
EDIT 2れます***例外により失敗しましたがLZ4に関連する例外が(ストリームが破損している)まだある:
16/10/28 07:53:00 ERROR util.Utils: Aborting task
java.io.IOException: Stream is corrupted
at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at org.apache.spark.io.LZ4BlockInputStream.available(LZ4BlockInputStream.java:109)
at java.io.BufferedInputStream.read(BufferedInputStream.java:353)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.spark_project.guava.io.ByteStreams.read(ByteStreams.java:899)
at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:127)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$3$$anon$1.next(UnsafeRowSerializer.scala:110)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:254)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 12966 of input buffer
at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:39)
at org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:205)
... 25 more
EDIT 3:私は、第2の配分を除去することによって、エラーなしでそれを起動するには、管理(1 ipix列を使用してそのパーティションを再分割する)このメソッドのドキュメントをさらに詳しく見ていきます。
EDIT 4:これは時折、いくつかのエグゼキュータは、セグメンテーションフォールトで失敗、奇妙です:
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007f48d8a47f2c, pid=3501, tid=0x00007f48cc60c700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 1.8.0_102-b14)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# J 4713 C2 org.apache.spark.unsafe.types.UTF8String.hashCode()I (18 bytes) @ 0x00007f48d8a47f2c [0x00007f48d8a47e60+0xcc]
#
# Core dump written. Default location: /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1477580152295_0008/container_1477580152295_0008_01_000006/core or core.3501
#
# An error report file with more information is saved as:
# /tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1477580152295_0008/container_1477580152295_0008_01_000006/hs_err_pid3501.log
#
# If you would like to submit a bug report, please visit:
# http://bugreport.java.com/bugreport/crash.jsp
#
私はメモリをチェックし、すべての私の執行は、常に空きメモリをたくさん持っている(少なくとも6Go)
編集4:私は複数のファイルをテストしたので、実行は常に成功しますが、ある種のエグゼキュータが失敗し(上記のエラーで)、YARNによって再度開始されます
もっと探検するためにコードを追加してください。 – Shankar
@Shankar done。 –
あなたは再パーティションせずに試しましたか?ちょうど推測.. – Shankar