2016-10-06 55 views
0

Google Dataflowを使用して、ヒューマン・イン・ザループ形式の補完を調整し、3つのフォームが完了した後に競合がないかどうかを確認する予定でした。私はDataflowのソースとシンクの両方に対してGoogle PubSubをセットアップしており、トリガーの発射をして、JobIdの3つのフォームが受信された後にPubSubシンクに送信したいだけです。Google DataflowトリガーがelementCountAtLeastに達する前に到達

This SO post私が解決しようとしていた問題と似ていましたが、実装すると、トリガーが発射され、AfterPane.elementCountAtLeastに達する前に出力がPubSubシンクに送信されています。

私はGlobalWindowSlidingWindowで試してみました。 elementCountAtLeastに達した後にトリガーを受けたら、jobIdGroupByKeyを実装する予定でした。しかし、私がそのステップに移る前に、elementCountAtLeastを孤立して働かせたいと思っています。ここで

はPubSubのとSlidingWindowから読み取るためのコードです:

PCollection<String> humanInTheLoopInput; 
humanInTheLoopInput = pipeline 
    .apply(PubsubIO.Read 
      .named("ReadFromHumanInTheLoopSubscription") 
      .subscription(options.getInputHumanInTheLoopRawSubscription())); 

PCollection<String> windowedInput = humanInTheLoopInput 
    .apply(Window 
      .<String>into(SlidingWindows 
         .of(Duration.standardSeconds(30)) 
         .every(Duration.standardSeconds(5))) 
      .<String>triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(3))) 
      .discardingFiredPanes() 
      .withAllowedLateness(Duration.standardDays(10))); 

答えて

2

トリガされているGroupByKey何もせずに。ウィンドウ処理とトリガ処理は、グループ化(および結合)操作にのみ影響します。

関連する問題