2017-02-20 6 views
0

ParDo.of(new ParDoFn())PCollectionに適用したとき、このプログラムはこの例外をスローします。しかし、.apply(ParDo.of(new ParDoFn()))を削除すると、プログラムは正常です。あなたが上で実行されているスパークバージョンAssertionError:アサーションに失敗しました:copyAndResetはゼロ値のコピーを返す必要があります

// SparkRunner

private static void testHadoop(Pipeline pipeline){ 
    Class<? extends FileInputFormat<LongWritable, Text>> inputFormatClass = 
      (Class<? extends FileInputFormat<LongWritable, Text>>) 
        (Class<?>) TextInputFormat.class; 
    @SuppressWarnings("unchecked") //hdfs://localhost:9000 
      HadoopIO.Read.Bound<LongWritable, Text> readPTransfom_1 = HadoopIO.Read.from("hdfs://localhost:9000/tmp/kinglear.txt", 
      inputFormatClass, 
      LongWritable.class, 
      Text.class); 
    PCollection<KV<LongWritable, Text>> textInput = pipeline.apply(readPTransfom_1) 
      .setCoder(KvCoder.of(WritableCoder.of(LongWritable.class), WritableCoder.of(Text.class))); 

    //OutputFormat 
    @SuppressWarnings("unchecked") 
    Class<? extends FileOutputFormat<LongWritable, Text>> outputFormatClass = 
      (Class<? extends FileOutputFormat<LongWritable, Text>>) 
        (Class<?>) TemplatedTextOutputFormat.class; 

    @SuppressWarnings("unchecked") 
    HadoopIO.Write.Bound<LongWritable, Text> writePTransform = HadoopIO.Write.to("hdfs://localhost:9000/tmp/output", outputFormatClass, LongWritable.class, Text.class); 

    textInput.apply(ParDo.of(new ParDoFn())).apply(writePTransform.withoutSharding()); 

    pipeline.run().waitUntilFinish(); 

} 
+0

あなたの質問に完全な例外スタックトレースを含めることができますか?これは確実に問題の絞り込みに役立ちます。また、Apache Beamの例でスタイルを踏襲することもできます。構築するトランスフォームは一度使用されます。それらをインライン化して、あなたのコードをもっと読みやすくすることができます。 –

答えて

3

?私の経験から、あなたが得ているエラーはSpark 2.x AccumulatorV2によってスローされ、Sparkランナーは現在Spark 1.6をサポートしています。

+0

あなたは正しいです! – zifanpan

+0

私はすでにSpark 1.6の問題を解決しました。 – zifanpan

+0

@zifanpanあなたはこれをどのように修正したのか説明できますか?あなたが提案した1.6.3と同じ依存関係のバージョンがありますが、これを修正できません。提案してください – Abhishek

1

org.apache.spark.util.AccumulatorV2を拡張するカスタムアキュムレータを作成したとき、同様の問題が発生しました。原因はoverride def isZero: Booleanメソッドでは不適切なロジックでした。したがって、基本的にcopyAndResetメソッドが呼び出されると、copy()が呼び出され、reset()isZero()がtrueを返します。あなたがチェックがされているところであるAccumulatorV2ソースを見れば は:

// Called by Java when serializing an object 
final protected def writeReplace(): Any = { 
if (atDriverSide) { 
    if (!isRegistered) { 
    throw new UnsupportedOperationException(
     "Accumulator must be registered before send to executor") 
    } 
    val copyAcc = copyAndReset() 
    assert(copyAcc.isZero, "copyAndReset must return a zero value copy") 
    copyAcc.metadata = metadata 
    copyAcc 
} else { 
    this 
} 
} 

特にこの部分

val copyAcc = copyAndReset() 
assert(copyAcc.isZero, "copyAndReset must return a zero value copy") 

それがお役に立てば幸いです。ハッピースパーク!

関連する問題