2017-06-06 7 views
1

私はフリンクストリームを持っており、私はいくつかの時間ウィンドウで30秒と言ういくつかの事を較正しています。フリックストリーミングウィンドウトリガー

ここで私の結果は私の前のウィンドウを集約してくれます。

ではなく、私が 新しいというように最後のウィンドウ結果+を取得し、私は新鮮な結果が欲しい結果10.

次ティリ秒を取得する最初の30秒間言います。

私の質問はどのように私は各ウィンドウの新鮮な結果を得るかです。

+1

あなたのコードについてのご意見をお聞かせください。 – ImbaBalboa

答えて

2

役立つだろう。あなたが望むのはFIRE_AND_PURGE(ウィンドウのコンテンツを生成して削除する)、デフォルトのフリンクトリガーはFIRE(ウィンドウの内容を保持して保持する)です。より多くの深さでの説明のために

input 
    .keyBy(...) 
    .timeWindow(Time.seconds(30)) 

    // The important part: Replace the default non-purging ProcessingTimeTrigger 
    .trigger(new PurgingTrigger[..., TimeWindow](ProcessingTimeTrigger)) 

    .reduce(...) 

TriggersFIRE vs FIRE_AND_PURGEに顔をしています。

トリガは、ウィンドウアサイナによって形成されたウィンドウがウィンドウ機能によって処理される準備が整ったときを決定します。各WindowAssignerには、デフォルトのTriggerが付属しています。デフォルトのトリガーがニーズに合わない場合は、トリガー(...)を使用してカスタムトリガーを指定できます。

トリガが起動すると、FIREまたはFIRE_AND_PURGEのいずれかが発生します。 FIRE はウィンドウの内容を保持しますが、、FIRE_AND_PURGE は内容を削除します。既定では、事前に実装されたトリガは、ウィンドウの状態をパージすることなく単にFIREをトリガします。