2017-06-13 21 views
1
val file = File.createTempFile("temp", ".avro") 
val schema = new Schema.Parser().parse(st) 
val datumWriter = new GenericDatumWriter[GenericData.Record](schema) 
val dataFileWriter = new DataFileWriter[GenericData.Record](datumWriter) 
dataFileWriter.create(schema , file) 
rdd.foreach(r => { 
    dataFileWriter.append(r) 
}) 
dataFileWriter.close() 

タイプの汎用のレコードのRDDを書き込もうとしたとき、私は私がアブロ形式でHDFSへの書き込みをしようとしていますが、私はこのTask Not Serializableエラーを取得していますタイプGenericData.RecordDStream持っている:作業ではありませんSerializableの例外

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2062) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:217) 
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:210) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at scala.util.Try$.apply(Try.scala:161) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.avro.file.DataFileWriter 
Serialization stack: 
- object not serializable (class: org.apache.avro.file.DataFileWriter, value: [email protected]) 
- field (class: KafkaCo$$anonfun$main$3$$anonfun$apply$1, name: dataFileWriter$1, type: class org.apache.avro.file.DataFileWriter) 
- object (class KafkaCo$$anonfun$main$3$$anonfun$apply$1, <function1>) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
+1

RDDオブジェクトをAvroファイルに書き込んで何を達成しようとしていますか? 'df.write.avro("/tmp/output ")' –

答えて

1

lambdaは実行するためにクラスタの周りに分散する必要があるため、シリアライズ可能なデータのみを参照する必要があります。

あなたはおそらく行う可能性がある:

  • mapPartitions(代わりにmapの)メソッドを使用し、各パーティション
  • のための新しいライターを作成
  • 新しいファイルを作成し、それへのハンドルを取得パーティション内に作成したライターでファイルハンドルを使用して、パーティション内の各メッセージをそのファイルに追加します。
  • ストリームが完全に消費されたときにファイルハンドルが閉じられるようにします。
+0

のようなものを使って、DataFrameをAvroフォーマットのファイルに直接保存するhttps://github.com/databricks/spark-avroで略奪するべきですここで私はどこでマップを使用していますか、擬似コードは非常に役に立つでしょう – JSR29

+0

一般的に有効なアドバイスではありますが、私は 'mapPartitions'とローカルオブジェクトがこの場合に役立つとは思いません。この 'dataFileWriter'がどのようにローカルファイル' dataFileWriter.create(schema、file) 'にバインドされているかに注目してください。' mapPartitions'を使用して、各実行者にローカルファイルを作成します。 – maasg

+0

これは、ストリームが完全に消費され、現在の操作モードではないときに閉じられるファイルハンドラを回避することを提案した理由です。今日、私はいくつかのコードで私の答えを編集する時間があることを願っています。 – stefanobaghino

2

ここで重要な点は、DataFileWriterがローカルリソース(ローカルファイルにバインドされている)であることです。シリアル化することは意味がありません。 のようなコードを実行すると、executor-boundアプローチはexecutorのローカルファイルシステムにファイルを書き込むため、コードを変更することも役に立ちません。

私たちは、そのライブラリを使用して、例えば、https://github.com/databricks/spark-avro

をスパークの分散性をサポートする実装を使用する必要があります。

私たちはどうなるcase classによって表されるいくつかのスキーマを、考える:

val structuredRDD = rdd.map(record => recordToSchema(record)) 
val df = structuredRDD.toDF() 
df.write.avro(hdfs_path) 
+0

what's recordToSchema – JSR29

+0

レコードフォーマットをあなたのケースクラスに変換するために書く関数です。 – maasg

+0

私は – JSR29

関連する問題