2013-12-16 54 views
16

私はSparkにいる、私はAvroファイルからRDDを持っている。私は今、そのRDDにいくつかの変換を行うと、アブロファイルとして戻って保存したい:Spark:Avroファイルに書き込む

val job = new Job(new Configuration()) 
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema)) 

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2)) 
    .saveAsNewAPIHadoopFile(outputPath, 
    classOf[AvroKey[GenericRecord]], 
    classOf[org.apache.hadoop.io.NullWritable], 
    classOf[AvroKeyOutputFormat[GenericRecord]], 
    job.getConfiguration) 

このスパークを実行しているスキーマ$ recordSchemaはシリアライズではないと文句を言い。

.map呼び出しのコメントを外して(そして単にrdd.saveAsNewAPIHadoopFileを持つ)、呼び出しは成功します。

私はここで間違っていますか?

+0

例外スタックトレースを提供してください。 Spark、Hadoop、Avroのバージョン番号も便利です。 – Wildfire

+0

私のnaivenessを許してください。ここで何をしているのか聞いてもよろしいですか?それは地図削減仕事のように見える?書き出しにスパークを使用すると、マップ削減ジョブが必要なのはなぜですか? –

答えて

2

Sparkで使用されるデフォルトのシリアライザは、Javaのシリアル化です。したがって、すべてのJava型に対して、Java直列化を使用して直列化しようとします。 AvroKeyはシリアライズ可能ではないため、エラーが発生しています。

カスタムシリアル化(Avroなど)でKryoSerializerまたはプラグインを使用できます。ここでシリアライズの詳細を読むことができます。 http://spark-project.org/docs/latest/tuning.html

オブジェクトを外部化可能なものでラップすることもできます。ここでAvroFlumeEventをラップするSparkFlumeEventを確認してください:https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

5

ここでの問題は、ジョブで使用されるavro.Schemaクラスの非シリアル化可能性に関するものです。例外は、マップ関数内のコードからスキーマオブジェクトを参照しようとするとスローされます。あなただけのスキーマの新しいインスタンスを作成することで、すべてが動作するように作ることができる

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
... 
rdd.map(t => { 
    // reference to the schema object declared outside 
    val record = new GenericData.Record(schema) 
}) 

:あなたは次のように実行しようとすれば、あなたは「タスク直列化可能ではない」例外が発生します。例えば

、機能ブロック内:あなたはあなたが扱うすべてのレコードのアブロスキーマを解析したくないので

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.map(t => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 
    val record = new GenericData.Record(innserSchema) 
    ... 
}) 

、よりよい解決策は、パーティション・レベルでスキーマを解析することになります。以下も動作します:マップ機能は、複数のリモート実行プログラムによって実行されようとしているので、

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.mapPartitions(tuples => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 

    tuples.map(t => { 
    val record = new GenericData.Record(innserSchema) 
    ... 
    // this closure will be bundled together with the outer one 
    // (no serialization issues) 
    }) 
}) 

を上記のコードは、限り、あなたはjsonSchemaファイルへの移植の参照を提供して動作します。これはHDFSのファイルへの参照でも、JARのアプリケーションと一緒にパッケージ化することもできます(後者の場合はクラスローダ関数を使用して内容を取得します)。スパークでアブロを使用しようとしている人のために

は、いくつかの未解決のコンパイルの問題が残っていることに気づくと、あなたはMavenのPOMに次のインポートを使用する必要があります。

<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro-mapred</artifactId> 
    <version>1.7.7</version> 
    <classifier>hadoop2</classifier> 
<dependency> 

"hadoop2"分類器を。問題はhttps://issues.apache.org/jira/browse/SPARK-3039で追跡できます。

+0

このメソッドは、マップ関数内に外部依存関係が存在しない場合に有効です。スキーマをシリアライズ可能にする方法はありますか? – COSTA

関連する問題