2017-05-03 21 views
2

DataflowRunnerでMap PCollectionViewを作成できません。Dataflow Mapのサイド入力の問題

以下のパイプラインでは、未入力のcountingInputとサイド入力(生成された10個の値を含む)の値が集計されています。 gcpでパイプラインを実行すると、View.asMap()トランスフォームの内部に滞留します。 具体的には、ParDo(StreamingPCollectionViewWriter)には出力がありません。

データフロー2.0.0-beta3とbeam-0.7.0-SNAPSHOTでこれを試しましたが、結果はまったくありませんでした。ローカルのDirectRunnerを使用しているときに、私のパイプラインが問題なく実行されていることに注意してください。

何か間違っていますか? すべてのサポートは、私を助けるために事前に感謝します!

public class SimpleSideInputPipeline { 

    private static final Logger LOG = LoggerFactory.getLogger(SimpleSideInputPipeline.class); 

    public interface Options extends DataflowPipelineOptions {} 

    public static void main(String[] args) throws IOException { 
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 
     Pipeline pipeline = Pipeline.create(options); 

     final PCollectionView<Map<Integer, String>> sideInput = pipeline 
       .apply(CountingInput.forSubrange(0L, 10L)) 
       .apply("Create KV<Integer, String>",ParDo.of(new DoFn<Long, KV<Integer, String>>() { 
        @ProcessElement 
        public void processElement(ProcessContext c) { 
         c.output(KV.of(c.element().intValue(), "TEST")); 
        } 
       })) 
       .apply(View.asMap()); 

     pipeline 
      .apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(5))) 
      .apply("Aggregate with side-input",ParDo.of(new DoFn<Long, KV<Long, String>>() { 
       @ProcessElement 
       public void processElement(ProcessContext c) { 
        Map<Integer, String> map = c.sideInput(sideInput); 

        //get first segment from map 
        Object[] values = map.values().toArray(); 
        String firstVal = (String) values[0]; 
        LOG.info("Combined: K: "+ c.element() + " V: " + firstVal + " MapSize: " + map.size()); 
        c.output(KV.of(c.element(), firstVal)); 
       } 
      }).withSideInputs(sideInput)); 

     pipeline.run(); 
    } 
} 

答えて

1

ParDo(StreamingPCollectionViewWriterFn)は、任意の出力を記録していないことを心配する必要はありません - それは実際に内部の場所に各要素を書き込みされて何をしますか。

あなたのコードはわかりました。これは調査する必要があります。私はBEAM-2155を提出しました。

関連する問題