0
サイド入力を使用すると、次のエラーが発生します。以下のモデルコードでサイド入力を使用するとエラーが発生する--SideInputsメソッドがKVタイプを入力として受け入れない
:
PCollectionView<Map<String, String>> view1= information
.apply(View.<String, String>asMap());
PCollection<KV<String, Position>> FileData;
FileData.apply("populate",
ParDo.of(new DoFn<KV<String, Position>, KV<String, Position>>() {
@ProcessElement
public void processElement(ProcessContext c) {
}.withSideInputs(view1));
withSideInputsメソッドが呼び出されたときにエラーが発生しました。 withsideinputは入力としてKVタイプの値を受け入れていません。私が逃していることを教えてください。
エラーメッセージ:
java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot be cast to java.lang.Iterable
at org.apache.beam.runners.core.SideInputHandler.addSideInputValue(SideInputHandler.java:142)
at org.apache.beam.runners.apex.translation.operators.ApexParDoOperator$2.process(ApexParDoOperator.java:225)
at org.apache.beam.runners.apex.translation.operators.ApexParDoOperator$2.process(ApexParDoOperator.java:207)
at com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:79)
at com.datatorrent.stram.engine.AbstractReservoir$SpscArrayBlockingQueueReservoir.sweep(AbstractReservoir.java:413)
at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:269)
at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
問題を再現するサンプルコード:
public void testMapAsEntrySetSideInput() {
final PCollectionView<Map<String, Integer>> view =
pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3)))
.apply(View.<String, Integer>asMap());
PCollection<KV<String, Integer>> output =
pipeline.apply("CreateMainInput", Create.of(2 /* size */))
.apply(
"OutputSideInputs",
ParDo.of(new DoFn<Integer, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
assertEquals((int) c.element(), c.sideInput(view).entrySet().size());
for (Entry<String, Integer> entry : c.sideInput(view).entrySet()) {
c.output(KV.of(entry.getKey(), entry.getValue()));
}
}
}).withSideInputs(view));
PAssert.that(output).containsInAnyOrder(
KV.of("a", 1), KV.of("b", 3));
pipeline.run();
}
あなたはApexランナーを使用しているようです。これは他のランナー、例えばダイレクトランナーと一緒に行われますか? – jkff
私はSpark runnerとApex runnerを使い、同じエラーに直面しています。 – VIjay
ダイレクトランナーでは起こりますか?そして、これは間違いなく、これらのランナーのバグのように思えます - これを再現して修正するために使用できる最小限のコード例を提供しようと思いますか? – jkff