ParDo.of(new ParDoFn())
をPCollection
に適用したとき、このプログラムはこの例外をスローします。しかし、.apply(ParDo.of(new ParDoFn()))
を削除すると、プログラムは正常です。あなたが上で実行されているスパークバージョンAssertionError:アサーションに失敗しました:copyAndResetはゼロ値のコピーを返す必要があります
// SparkRunner
private static void testHadoop(Pipeline pipeline){
Class<? extends FileInputFormat<LongWritable, Text>> inputFormatClass =
(Class<? extends FileInputFormat<LongWritable, Text>>)
(Class<?>) TextInputFormat.class;
@SuppressWarnings("unchecked") //hdfs://localhost:9000
HadoopIO.Read.Bound<LongWritable, Text> readPTransfom_1 = HadoopIO.Read.from("hdfs://localhost:9000/tmp/kinglear.txt",
inputFormatClass,
LongWritable.class,
Text.class);
PCollection<KV<LongWritable, Text>> textInput = pipeline.apply(readPTransfom_1)
.setCoder(KvCoder.of(WritableCoder.of(LongWritable.class), WritableCoder.of(Text.class)));
//OutputFormat
@SuppressWarnings("unchecked")
Class<? extends FileOutputFormat<LongWritable, Text>> outputFormatClass =
(Class<? extends FileOutputFormat<LongWritable, Text>>)
(Class<?>) TemplatedTextOutputFormat.class;
@SuppressWarnings("unchecked")
HadoopIO.Write.Bound<LongWritable, Text> writePTransform = HadoopIO.Write.to("hdfs://localhost:9000/tmp/output", outputFormatClass, LongWritable.class, Text.class);
textInput.apply(ParDo.of(new ParDoFn())).apply(writePTransform.withoutSharding());
pipeline.run().waitUntilFinish();
}
あなたの質問に完全な例外スタックトレースを含めることができますか?これは確実に問題の絞り込みに役立ちます。また、Apache Beamの例でスタイルを踏襲することもできます。構築するトランスフォームは一度使用されます。それらをインライン化して、あなたのコードをもっと読みやすくすることができます。 –