2016-09-30 7 views
2

Flinkで初めてのリアルタイム解析ジョブを作成しようとしました。このアプローチはカッパアーキテクチャのようなものなので、カフカには生データがあり、エンティティの状態の変化ごとにメッセージを受け取ることができます。Flinkでストリーミングでステートフルなエンティティの最新の状態をカウントする

ので、メッセージの形式は次の通り:

(id,newStatus, timestamp) 

我々は、すべての時間ウィンドウのために、与えられた状況にあるアイテムの数を計算したいです。したがって、出力は次の形式にする必要があります。

(outputTimestamp, state1:count1,state2:count2 ...) 

または同等のものを使用してください。これらの行には、指定されたステータスの項目の数が常に含まれている必要があります。ここで、Idに関連付けられたステータスは、そのIDに対して観測された最新のメッセージです。イベントが処理されるよりも古い場合でも、idのステータスはどの場合でもカウントされるべきです。したがって、すべてのカウントの合計は、システムで観測される異なるIDの数と等しくなければなりません。次のステップは、しばらくしてから最後のアイテムのアイテムを忘れる可能性がありますが、これは現在厳しい要件ではありません。

これはelasticsearchで書かれ、次に照会されます。

私は多くの異なるパスを試しましたが、どれも完全に要件を満たしていませんでした。スライディングウインドウを使用すると、予想されるように、スライディングウインドウの開始がイベントのタイムスタンプを超えたときに、カウントのために失われたという点を除いて、期待される動作を簡単に達成することができました。データが一度に処理されたときに失敗したキーとタイムスタンプでいくつかのやりとりをしたので、他の方法ではバックログを扱う際に一貫性がなくなりました。

私はこの問題にどのように接近すべきか、高いレベルで知りたいと思います。これは比較的一般的なユースケースのように見えますが、特定のIDの関連情報を無期限に保持してエンティティを正しくカウントする必要があるという事実は、多くの問題を引き起こします。次のように、実際の状態の変化を導き出す

val stateUpdates: DataStream[(Long, Int, ts)] = ??? 

:よう(id, state, time)DataStreamを考えると

答えて

3

私は私があなたの問題の解決策を持っていると思う

val stateCntUpdates: DataStream[(Int, Int)] = s // (state, cntUpdate) 
    .keyBy(_._1) // key by id 
    .flatMap(new StateUpdater) 

StateUpdaterですステートフルFlatMapFunction。これは、各IDの最後の状態を格納するキー状態を持っています。各入力レコードに対して、2つの状態カウント更新レコード、すなわち(oldState, -1),(newState, +1)を返します。 (oldState, -1)レコードは、以前の状態の数が減少することを保証します。

val cntUpdatesPerWindow: DataStream[(Int, Int, Long)] = stateCntUpdates // (state, cntUpdate, time) 
    .keyBy(_._1) // key by state 
    .timeWindow(Time.minutes(10)) // window should be non-overlapping, e.g. Tumbling 
    .apply(new SumReducer(), new YourWindowFunction()) 

SumReducer合計cntUpdatesとYourWindowFunctionがあなたの窓のタイムスタンプを割り当てます。

次はあなたが国家や窓あたりの状態数の変化を集約します。このステップでは、ウィンドウ内の各状態のすべての状態変化を集計します。

最後に、カウントの更新で現在のカウントを調整します。

val stateCnts: DataStream[(Int, Int, Long)] = cntUpdatesPerWindow // (state, count, time) 
    .keyBy(_._1) // key by state again 
    .map(new CountUpdater) 

CountUpdaterステートフルMapFunctionです。これには、各状態の現在のカウントを格納するキー付き状態があります。受信レコードごとに、カウントが調整され、レコード(state, newCount, time)が発行されます。

これで、各状態(新しい状態ごとに1つのレコード)の新しいカウントを持つストリームが作成されました。可能であれば、これらのレコードを使用してElasticsearchインデックスを更新できます。一定の時間にすべての状態数を収集する必要がある場合は、ウィンドウを使用してそれを行うことができます。

注::このプログラムの状態サイズは、ユニークなIDの数によって異なります。 id空間が非常に高速になると、問題が発生する可能性があります。

+0

私はあなたの提案に取り組んでいます。ありがとうございます。私がここで紛失しているのは、 'YourWindowFunction'がすべきことです。私はイベントの時間の概念を持っていないので、私はタイムスタンプを割り当てることができません。また、このソリューションは、処理時間で動作しているようですが、私はイベント時間を気にしています。私はまだそれを実行することができませんでしたが、私が得たものについては、これは私が必要とするものとは少し異なります。 – Chobeat

+0

これはイベント時にも機能します。 exec環境変数に正しい 'TimeCharacteristics'を設定し、タイムスタンプ+ウォーターマークを割り当てる必要があります。唯一の時間依存の操作はウィンドウです。 'YourWindowFunction'はウィンドウのタイムスタンプを割り当てます。 'WindowFunction.apply()'には、ウィンドウの開始時刻と終了時刻にアクセスできる 'TimeWindow'パラメータがあります。 [docs](https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation)を参照してください。 –

+0

'TimeCharacteristics'はすでに設定されていましたが、このレイアウトでタイムスタンプを割り当てる方法がわかりません。アップデートと一緒にタイムスタンプを付けるべきですか?何かのような(ステータス、カウント、タイムスタンプ)? – Chobeat

関連する問題