ここでの問題は、ジョブで使用されるavro.Schemaクラスの非シリアル化可能性に関するものです。例外は、マップ関数内のコードからスキーマオブジェクトを参照しようとするとスローされます。あなただけのスキーマの新しいインスタンスを作成することで、すべてが動作するように作ることができる
val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
// reference to the schema object declared outside
val record = new GenericData.Record(schema)
})
:あなたは次のように実行しようとすれば、あなたは「タスク直列化可能ではない」例外が発生します。例えば
、機能ブロック内:あなたはあなたが扱うすべてのレコードのアブロスキーマを解析したくないので
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
val record = new GenericData.Record(innserSchema)
...
})
、よりよい解決策は、パーティション・レベルでスキーマを解析することになります。以下も動作します:マップ機能は、複数のリモート実行プログラムによって実行されようとしているので、
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
tuples.map(t => {
val record = new GenericData.Record(innserSchema)
...
// this closure will be bundled together with the outer one
// (no serialization issues)
})
})
を上記のコードは、限り、あなたはjsonSchemaファイルへの移植の参照を提供して動作します。これはHDFSのファイルへの参照でも、JARのアプリケーションと一緒にパッケージ化することもできます(後者の場合はクラスローダ関数を使用して内容を取得します)。スパークでアブロを使用しようとしている人のために
は、いくつかの未解決のコンパイルの問題が残っていることに気づくと、あなたはMavenのPOMに次のインポートを使用する必要があります。
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
<classifier>hadoop2</classifier>
<dependency>
注"hadoop2"
分類器を。問題はhttps://issues.apache.org/jira/browse/SPARK-3039で追跡できます。
例外スタックトレースを提供してください。 Spark、Hadoop、Avroのバージョン番号も便利です。 – Wildfire
私のnaivenessを許してください。ここで何をしているのか聞いてもよろしいですか?それは地図削減仕事のように見える?書き出しにスパークを使用すると、マップ削減ジョブが必要なのはなぜですか? –