2017-06-18 5 views
1

私はこのストリームをセッションしたい:1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,3,3,3,3,5、。 。これらのセッションへ:Apache Flinkでストリームをセッション化する方法は?

1,1,1 
2,2,2,2,2 
3,3,3,3,3,3,3 
0 
3,3,3 
5 

私は、ストリーム要素は、(そうで、2〜3 3 0にして)1から2に変更したときに検出し、トリガーを起動するCustomTriggerを書いてきました。しかし、これは解決策ではありません。なぜなら、2の最初の要素を処理し、トリガを起動すると、ウィンドウは[1,1,1,2]になりますが、1の最後の要素でトリガを起動する必要があるからです。ここで

は私のカスタムトリガークラスの私のonElement機能のpesudoです:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = { 
    if (prevState == element.value) { 
     prevState = element.value 
     TriggerResult.CONTINUE 
    } else { 
     prevState = element.value 
     TriggerResult.FIRE 
    } 
} 

どのように私はこの問題を解決することができますか?

答えて

2

ListStateFlatMapFunctionは、このユースケースを実装する最も簡単な方法だと思います。

新しい要素が到着すると(つまり、flatMap()メソッドが呼び出されたとき)、値が変更されたかどうかを確認します。値が変更されていない場合は、要素を状態に追加します。値が変更された場合は、現在のリスト状態をセッションとして発行し、リストを消去し、最初のリスト状態として新しい要素を挿入します。

ただし、要素の順序が保持されていることを前提としています。 Flinkは、パーティション内で、つまり要素がシャッフルされず、すべての演算子が同じ並列性で実行される限り、確実に行います。

関連する問題