私はこのストリームをセッションしたい:1,1,1,2,2,2,2,2,3,3,3,3,3,3,3,3,3,3,3,5、。 。これらのセッションへ:Apache Flinkでストリームをセッション化する方法は?
1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5
私は、ストリーム要素は、(そうで、2〜3 3 0にして)1から2に変更したときに検出し、トリガーを起動するCustomTriggerを書いてきました。しかし、これは解決策ではありません。なぜなら、2の最初の要素を処理し、トリガを起動すると、ウィンドウは[1,1,1,2]になりますが、1の最後の要素でトリガを起動する必要があるからです。ここで
は私のカスタムトリガークラスの私のonElement機能のpesudoです:
override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
if (prevState == element.value) {
prevState = element.value
TriggerResult.CONTINUE
} else {
prevState = element.value
TriggerResult.FIRE
}
}
どのように私はこの問題を解決することができますか?