2016-09-04 21 views
0

私は、スパークRDDをgzippedテキストファイル(または複数のテキストファイル)としてS3バケットに保存しようとしています。 S3バケットはdbfsにマウントされています。S3バケットへのテキストファイルとしてのスパークRDDの書き込み

rddDataset.saveAsTextFile("/mnt/mymount/myfolder/") 

をしかし、これをしようとしたとき、私はエラーを取得しておく::私は、次を使用してファイルを保存しようとしている

org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 18.0 failed 4 times, most recent failure: Lost task 32.3 in stage 18.0 (TID 279, ip-10-81-194-225.ec2.internal): java.lang.NullPointerException 

しかし、私はS3バケットに書き込まれたいくつかのファイルを参照しています。また、hereのようにrddDataset.repartition(1).saveAsTextFile("/mnt/mymount/myfolder/")を使ってみましたが、これは同じエラーで終了しました。

これはthis questionと似ているようですので、エラーは私のRDDのヌル値によるものでしょうか?しかし、私がval newRDD = rddDataset.map(line => line).filter(x => x!= null).filter(x => x!=" ").filter(x => x!="")を試してこのRDDを保存しようとすると、同じエラーが発生します。

さらに、rddDataset.count()も同様のエラーが発生します。私はすべての行を表示するデータフレームからrddDatasetを作成しています。私はRDDに私の元のデータフレームを変換する場合は、私がjava.lang.NullPointerExceptionを再現することができます

val testRDD = df.rdd 
testRDD.count() 

> org.apache.spark.SparkException: Job aborted due to stage failure: Task 32 in stage 85.0 failed 4 times, most recent failure: Lost task 32.3 in stage 85.0 (TID 1668, ip-10-81-194-241.ec2.internal): java.lang.NullPointerException 

私は以下のスタックトレースのいずれかを提供してきました:

Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1927) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1209) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1154) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1154) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1154) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1060) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1026) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:952) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952) 
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:952) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:951) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1457) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1436) 
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1436) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1436) 
Caused by: java.lang.NullPointerException 

また、私は情報を開くときrddDataset.repartition(200).saveAsTextFile(/mnt/mymount/myfolder)を実行した後、私はエラーの詳細を見つけることができ、ステージ用のタブ:

java.lang.NullPointerException 
at linef9b86491b9da46b9858e22af0cc8257227.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:48) 
at linef9b86491b9da46b9858e22af0cc8257227.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:48) 
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr35$(Unknown Source) 
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) 
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) 
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:46) 
at org.apache.spark.scheduler.Task.run(Task.scala:96) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:235) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
+0

NPEのスタックトレース全体をポストできますか? –

答えて

0

Dを事前に処理している間、私はきちんと私のUDFのいずれかにNULL値を処理していないましたata。具体的には、私は後に動作するように見えた

val converter = (arg: String) => { 
    if (arg == null || arg== "") "" else arg.split("").mkString("_").replace(":","_") 
} 

すべてに

val converter = (arg: String) => { 
    arg.split("").mkString("_").replace(":","_") 
} 

からの私のUDFのいずれかを変更する必要がありました。

関連する問題