2017-06-06 13 views
1

カウントが100に達した後、または5秒ごとに処理が完了した後に、Windowsを終了したいですか?つまり、要素が100に達したときにWindows計算をトリガーしますが、要素が100に達しても時間が5秒経過した場合、以下の2つのトリガーの組み合わせと同様にWindows計算がトリガーされます。FlinkでTriggerとCountとProcess Timeの両方を組み合わせることはできますか?

.countWindow(100)

.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

答えて

1

現在FLINKのAPIでこれを行うには、スーパー簡単な方法はありませんあります。

ユースケースには、状態(カウント用)とタイマーの組み合わせが必要です。カスタムTriggerを使用するか、ProcessFunctionを使用してウィンドウでこれを達成できます。

Windowsとカスタムトリガーのアプローチについては、ProcessingTimeTriggerとCountTriggerのimplementationsを見ると、基本的に2つをブレンドしたいので便利です。

ProcessFunctionは、マネージドステートとタイマーを組み合わせた低レベルのビルディングブロックです。これは必要なものです。Flink's managed stateの操作方法がすでにわかっている場合は、これがおそらく簡単でしょう。

ところで、online Flink training materialsには、カスタムトリガの実装とProcessFunctionの使用に関するスライドと演習が含まれています。

トリガ:slidesexercise
ProcessFunction:slidesexercise

+0

は、あなたの親切に助けをどうもありがとうございます、私はいくつかの困難を持っている場合、私はあなたの材料のためのいくつかのチェックを行い、その応答ここだろうでしょう。 –

+0

ここで私の例がhttp://dataartisans.github.io/flink-training/dataStream/5-intro.htmlがhttps://ci.apache.org/projects/flink/の公式の例と異なっていることに気付きましたflim-docs-release-1.3/dev/stream/process_function.html onTimer関数の場合、タイムスタンプ== result.lastModified + XXX)がありますが、例ではXXXはありません。 –

+0

また、ctx.timestamp()の意味は何ですか?それがctxに割り当てられた場所はどこから来ていますか?公式の例から、ctx.timestamp()は60秒ごとに変化するようですが、それは透かしですか? –

関連する問題