でウィンドウ内の無制限のデータを消費します。固定のウィンドウを使用して、集計をBigQueryに書き込みます。は私が<a href="https://cloud.google.com/pubsub/docs" rel="nofollow noreferrer">Pub/Sub</a>トピック+サブスクリプションを持っているし、消費し、<a href="https://cloud.google.com/dataflow/docs" rel="nofollow noreferrer">Dataflow</a>にサブスクリプションからの無制限のデータを集計するデフォルトのトリガー
読み取りと書き込み(ウィンドウと集計なし)は正常に動作します。しかし、データを固定ウィンドウにパイプすると(各ウィンドウ内の要素を数えるために)、ウィンドウは決してになりません。したがって、集合体は記述されません。ここで
は私の言葉の出版社は(それが入力ファイルとしてexamplesからkinglear.txtを使用しています)です。ここで
public static class AddCurrentTimestampFn extends DoFn<String, String> {
@ProcessElement public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element(), new Instant(System.currentTimeMillis()));
}
}
public static class ExtractWordsFn extends DoFn<String, String> {
@ProcessElement public void processElement(ProcessContext c) {
String[] words = c.element().split("[^a-zA-Z']+");
for (String word:words){ if(!word.isEmpty()){ c.output(word); }}
}
}
// main:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
p.apply("ReadLines", TextIO.Read.from(o.getInputFile()))
.apply("Lines2Words", ParDo.of(new ExtractWordsFn()))
.apply("AddTimestampFn", ParDo.of(new AddCurrentTimestampFn()))
.apply("WriteTopic", PubsubIO.Write.topic(o.getTopic()));
p.run();
は私の窓付きワードカウンタです:
Pipeline p = Pipeline.create(o); // 'o' are the pipeline options
BigQueryIO.Write.Bound tablePipe = BigQueryIO.Write.to(o.getTable(o))
.withSchema(o.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
Window.Bound<String> w = Window
.<String>into(FixedWindows.of(Duration.standardSeconds(1)));
p.apply("ReadTopic", PubsubIO.Read.subscription(o.getSubscription()))
.apply("FixedWindow", w)
.apply("CountWords", Count.<String>perElement())
.apply("CreateRows", ParDo.of(new WordCountToRowFn()))
.apply("WriteRows", tablePipe);
p.run();
上記の加入者は動作しません。これは、ウィンドウがdefault triggerを使用してトリガーしていないように見えるためです。しかし、手動でトリガーを定義すると、コードが機能し、カウントがBigQueryに書き込まれます。
Window.Bound<String> w = Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();
可能であれば、カスタムトリガーを指定しないようにしたいと思います。
質問:
- なぜ私のソリューションは、データフローのdefault triggerでは動作しませんか?
- default triggerを使用してウィンドウをトリガーするには、パブリッシャーまたはサブスクライバーを変更する必要がありますか?
ジョブIDを持っていますか? – jkff
トリガは、1回の発砲の後に「終了」し、それ以降のすべてのデータを削除します。デフォルトのトリガーと一致させるには、 'Repeatedly.forever(AfterWatermark.pastEndOfWindow())'(許可された遅延がゼロの場合には実際には繰り返されません)を指定します。 –
このトリガーでテストしました。しかし、透かしベースのものは、説明されたシナリオでは機能しません。 "CountWords"ステップの内部の 'Combine.GroupedValues'アクションは決して実行されません。パイプラインを10分以上実行したままにしても"CountWord"の 'GroupByKey'アクションは最後に実行されたアクションです。 Benが提案した 'timestampLabel'アプローチを調査/試します。 @jkff:ジョブID:2017-01-04_07_33_56-14457038567248221079 – Juve