2017-10-27 4 views
0

サイド入力を使用すると、次のエラーが発生します。以下のモデルコードでサイド入力を使用するとエラーが発生する--SideInputsメソッドがKVタイプを入力として受け入れない

PCollectionView<Map<String, String>> view1= information 
       .apply(View.<String, String>asMap()); 

PCollection<KV<String, Position>> FileData; 

FileData.apply("populate", 
ParDo.of(new DoFn<KV<String, Position>, KV<String, Position>>() { 
        @ProcessElement 
public void processElement(ProcessContext c) { 

}.withSideInputs(view1)); 

withSideInputsメソッドが呼び出されたときにエラーが発生しました。 withsideinputは入力としてKVタイプの値を受け入れていません。私が逃していることを教えてください。

エラーメッセージ:

java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot be cast to java.lang.Iterable 
     at org.apache.beam.runners.core.SideInputHandler.addSideInputValue(SideInputHandler.java:142) 
     at org.apache.beam.runners.apex.translation.operators.ApexParDoOperator$2.process(ApexParDoOperator.java:225) 
     at org.apache.beam.runners.apex.translation.operators.ApexParDoOperator$2.process(ApexParDoOperator.java:207) 
     at com.datatorrent.api.DefaultInputPort.put(DefaultInputPort.java:79) 
     at com.datatorrent.stram.engine.AbstractReservoir$SpscArrayBlockingQueueReservoir.sweep(AbstractReservoir.java:413) 
     at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:269) 
     at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428) 

問題を再現するサンプルコード:

public void testMapAsEntrySetSideInput() { 

    final PCollectionView<Map<String, Integer>> view = 
     pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3))) 
      .apply(View.<String, Integer>asMap()); 

    PCollection<KV<String, Integer>> output = 
     pipeline.apply("CreateMainInput", Create.of(2 /* size */)) 
      .apply(
       "OutputSideInputs", 
       ParDo.of(new DoFn<Integer, KV<String, Integer>>() { 
        @ProcessElement 
        public void processElement(ProcessContext c) { 
        assertEquals((int) c.element(), c.sideInput(view).size()); 
        assertEquals((int) c.element(), c.sideInput(view).entrySet().size()); 
        for (Entry<String, Integer> entry : c.sideInput(view).entrySet()) { 
         c.output(KV.of(entry.getKey(), entry.getValue())); 
        } 
        } 
       }).withSideInputs(view)); 

    PAssert.that(output).containsInAnyOrder(
     KV.of("a", 1), KV.of("b", 3)); 

    pipeline.run(); 
    } 
+0

あなたはApexランナーを使用しているようです。これは他のランナー、例えばダイレクトランナーと一緒に行われますか? – jkff

+0

私はSpark runnerとApex runnerを使い、同じエラーに直面しています。 – VIjay

+0

ダイレクトランナーでは起こりますか?そして、これは間違いなく、これらのランナーのバグのように思えます - これを再現して修正するために使用できる最小限のコード例を提供しようと思いますか? – jkff

答えて

0

これは、Apexのランナーのバグだったし、ビームリリース2.3.0で解決されています。

関連する問題