2017-01-03 21 views
0

でウィンドウ内の無制限のデータを消費します。固定のウィンドウを使用して、集計をBigQueryに書き込みます。は私が<a href="https://cloud.google.com/pubsub/docs" rel="nofollow noreferrer">Pub/Sub</a>トピック+サブスクリプションを持っているし、消費し、<a href="https://cloud.google.com/dataflow/docs" rel="nofollow noreferrer">Dataflow</a>にサブスクリプションからの無制限のデータを集計するデフォルトのトリガー

読み取りと書き込み(ウィンドウと集計なし)は正常に動作します。しかし、データを固定ウィンドウにパイプすると(各ウィンドウ内の要素を数えるために)、ウィンドウは決して​​になりません。したがって、集合体は記述されません。ここで

は私の言葉の出版社は(それが入力ファイルとしてexamplesからkinglear.txtを使用しています)です。ここで

public static class AddCurrentTimestampFn extends DoFn<String, String> { 
    @ProcessElement public void processElement(ProcessContext c) { 
     c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis())); 
    } 
} 

public static class ExtractWordsFn extends DoFn<String, String> { 
    @ProcessElement public void processElement(ProcessContext c) { 
     String[] words = c.element().split("[^a-zA-Z']+"); 
     for (String word:words){ if(!word.isEmpty()){ c.output(word); }} 
    } 
} 

// main: 
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options 
p.apply("ReadLines", TextIO.Read.from(o.getInputFile())) 
     .apply("Lines2Words", ParDo.of(new ExtractWordsFn())) 
     .apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn())) 
     .apply("WriteTopic", PubsubIO.Write.topic(o.getTopic())); 
p.run(); 

は私の窓付きワードカウンタです:

Pipeline p = Pipeline.create(o); // 'o' are the pipeline options 

BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o)) 
     .withSchema(o.getSchema()) 
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND); 

Window.Bound<String> w = Window 
     .<String>into(FixedWindows.of(Duration.standardSeconds(1))); 

p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription())) 
     .apply("FixedWindow", w) 
     .apply("CountWords", Count.<String>perElement()) 
     .apply("CreateRows", ParDo.of(new WordCountToRowFn())) 
     .apply("WriteRows", tablePipe); 
p.run(); 

上記の加入者は動作しません。これは、ウィンドウがdefault triggerを使用してトリガーしていないように見えるためです。しかし、手動でトリガーを定義すると、コードが機能し、カウントがBigQueryに書き込まれます。

Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))) 
     .triggering(AfterProcessingTime 
       .pastFirstElementInPane() 
       .plusDelayOf(Duration.standardSeconds(1))) 
     .withAllowedLateness(Duration.ZERO) 
     .discardingFiredPanes(); 

可能であれば、カスタムトリガーを指定しないようにしたいと思います。

質問:

  1. なぜ私のソリューションは、データフローのdefault triggerでは動作しませんか?
  2. default triggerを使用してウィンドウをトリガーするには、パブリッシャーまたはサブスクライバーを変更する必要がありますか?
+0

ジョブIDを持っていますか? – jkff

+0

トリガは、1回の発砲の後に「終了」し、それ以降のすべてのデータを削除します。デフォルトのトリガーと一致させるには、 'Repeatedly.forever(AfterWatermark.pastEndOfWindow())'(許可された遅延がゼロの場合には実際には繰り返されません)を指定します。 –

+0

このトリガーでテストしました。しかし、透かしベースのものは、説明されたシナリオでは機能しません。 "CountWords"ステップの内部の 'Combine.GroupedValues'アクションは決して実行されません。パイプラインを10分以上実行したままにしても"CountWord"の 'GroupByKey'アクションは最後に実行されたアクションです。 Benが提案した 'timestampLabel'アプローチを調査/試します。 @jkff:ジョブID:2017-01-04_07_33_56-14457038567248221079 – Juve

答えて

1

どのようにトリガーを決定している発射したことがありませんか?

あなたPubSubIO.WritePubSubIO.Read変換は、あなたがPubSubのに書き込まれません追加したそれ以外の場合は、タイムスタンプをwithTimestampLabelを使用して、タイムスタンプのラベルを指定する必要があり、両方とパブリッシュ時間が使用されます。

いずれかの方法で、パイプラインの入力透かしはPubSubの中で待機している要素のタイムスタンプから導出されます。すべての入力が処理されると、それはリアルタイムに進む前に数分間(出版社に遅れがあった場合)戻っています。あなたが見て可能性がありますどのような

は(入力ファイルはかなり小さいので)すべての要素が同じ〜1秒のウィンドウで公開されているということです。これらはすべて比較的早く読み込まれ、処理されますが、投入された1秒間のウィンドウは、入力された透かしが進んでからその1秒間のウィンドウ内のすべてのデータが消費されるまでトリガーされません。

これは数分間経過するまで発生しないため、トリガーが機能していないように見える場合があります。あなたが書いたトリガは、処理時間が1秒後に開始されましたが、これははるか早く発生しますが、すべてのデータが処理されたという保証はありません。デフォルトのトリガーから、より良い動作を取得するために

ステップ:

  1. 使用withTimestampLabel書き込みの両方にとのpubsubの手順をお読みください。
  2. パブリッシャーにタイムスタンプをさらに広げてもらう(たとえば、数分間実行し、その範囲にタイムスタンプを広げる)
関連する問題

 関連する問題