Flinkで初めてのリアルタイム解析ジョブを作成しようとしました。このアプローチはカッパアーキテクチャのようなものなので、カフカには生データがあり、エンティティの状態の変化ごとにメッセージを受け取ることができます。Flinkでストリーミングでステートフルなエンティティの最新の状態をカウントする
ので、メッセージの形式は次の通り:
(id,newStatus, timestamp)
我々は、すべての時間ウィンドウのために、与えられた状況にあるアイテムの数を計算したいです。したがって、出力は次の形式にする必要があります。
(outputTimestamp, state1:count1,state2:count2 ...)
または同等のものを使用してください。これらの行には、指定されたステータスの項目の数が常に含まれている必要があります。ここで、Idに関連付けられたステータスは、そのIDに対して観測された最新のメッセージです。イベントが処理されるよりも古い場合でも、idのステータスはどの場合でもカウントされるべきです。したがって、すべてのカウントの合計は、システムで観測される異なるIDの数と等しくなければなりません。次のステップは、しばらくしてから最後のアイテムのアイテムを忘れる可能性がありますが、これは現在厳しい要件ではありません。
これはelasticsearchで書かれ、次に照会されます。
私は多くの異なるパスを試しましたが、どれも完全に要件を満たしていませんでした。スライディングウインドウを使用すると、予想されるように、スライディングウインドウの開始がイベントのタイムスタンプを超えたときに、カウントのために失われたという点を除いて、期待される動作を簡単に達成することができました。データが一度に処理されたときに失敗したキーとタイムスタンプでいくつかのやりとりをしたので、他の方法ではバックログを扱う際に一貫性がなくなりました。
私はこの問題にどのように接近すべきか、高いレベルで知りたいと思います。これは比較的一般的なユースケースのように見えますが、特定のIDの関連情報を無期限に保持してエンティティを正しくカウントする必要があるという事実は、多くの問題を引き起こします。次のように、実際の状態の変化を導き出す
val stateUpdates: DataStream[(Long, Int, ts)] = ???
:よう(id, state, time)
のDataStream
を考えると
:
私はあなたの提案に取り組んでいます。ありがとうございます。私がここで紛失しているのは、 'YourWindowFunction'がすべきことです。私はイベントの時間の概念を持っていないので、私はタイムスタンプを割り当てることができません。また、このソリューションは、処理時間で動作しているようですが、私はイベント時間を気にしています。私はまだそれを実行することができませんでしたが、私が得たものについては、これは私が必要とするものとは少し異なります。 – Chobeat
これはイベント時にも機能します。 exec環境変数に正しい 'TimeCharacteristics'を設定し、タイムスタンプ+ウォーターマークを割り当てる必要があります。唯一の時間依存の操作はウィンドウです。 'YourWindowFunction'はウィンドウのタイムスタンプを割り当てます。 'WindowFunction.apply()'には、ウィンドウの開始時刻と終了時刻にアクセスできる 'TimeWindow'パラメータがあります。 [docs](https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation)を参照してください。 –
'TimeCharacteristics'はすでに設定されていましたが、このレイアウトでタイムスタンプを割り当てる方法がわかりません。アップデートと一緒にタイムスタンプを付けるべきですか?何かのような(ステータス、カウント、タイムスタンプ)? – Chobeat