2016-11-09 3 views
7

Flinkストリーミング:開始要素と終了要素で定義されたウィンドウを実装する方法は?私は次の形式でデータを持っている

SIP | 2405463430 | 4115474257 | 8.205142580136622E12 |火11月8日夜04時58分58秒 2016 IST | 2405463430 | | 4115474257 | 8.205142580136622E12 | RTPをINVITE火11月8日 16:58:58 IST 2016 | 0 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue 11月8日16:58:58 IST 2016 | 1 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 | Tue Nov 08 16:58:58 IST 2016 | 2 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |火11月8日夜04時58分58秒 IST 2016 | 3 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |火 11月8日夜04時58分58秒IST 2016 | 4 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |火11月8日午前16時58分58秒 2016 IST | 5 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |火11月8日 IST 2016年午前16時58分58秒| 6 RTP | 2405463430 | 4115474257 | 8.205142580136622 E12 |火 11月8日午前16時58分58秒IST 2016 | 7 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |火11月8日午前16時58分58秒IST 2016 | 8 RTP | 2405463430 | 4115474257 | 8.205142580136622E12 |火11月8日 午後4時58分58秒IST 2016 | 9 SIP | 2405463430 | 4115474257 | 8.205142580136622E12 |火 11月8日午後4時58分58秒IST 2016 | BYE

私はSIP-INVITEメッセージに遭遇したときに私のウィンドウを起動したいですイベントがトリガーされたときメッセージが発生し、いくつかの集計を実行します。

どうすればよいですか? SIP-INVITEメッセージは、特定のユーザーの任意の時点に送信されます。また、同時に複数のユーザーが複数のメッセージを受信する可能性があります(SIP-INVITE)。

答えて

2

あなたは、ユーザーがキー入力したグローバルウィンドウでユースケースを解決できると思います。グローバルウィンドウはキーごとにすべてのデータを収集し、ウィンドウのトリガーとパージの責任をユーザー定義のTrigger関数にプッシュします。次のように

グローバルウインドウが定義されています:

val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker) 
val agg = input 
    // one global window per user (handles overlapping SIP-INVITE events). 
    .keyBy(_._1) 
    // collect all data for each user until the trigger fires and purges the window. 
    .window(GlobalWindows.create()) 
    // you have to implement a custom trigger which reacts on the marker. 
    .trigger(new YourCustomTrigger()) 
    // the window function computes your aggregation. 
    .apply(new YourWindowFunction()) 

私は次のように(SIP-INVITEイベントは、常にセッションを開始していることを仮定して)動作するはずないトリガーを考えます。 Trigger.onElement()メソッドは、SIP-BYEフィールドをチェックし、ウィンドウ評価をトリガーしてウィンドウをパージしなければならない。すなわち、TriggerResult.FIRE_AND_PURGEを返すべきである。これにより、評価関数が呼び出され、ウィンドウの状態が削除されます。

アウトオブオーダーイベントをサポートしたい場合は注意が必要です(この場合、タイムスタンプの前のすべてのデータが確実に受信されるように、イベントタイムタイマーを閉じるエレメントのタイムスタンプに設定する必要があります) )。 「SIP-INVITE」と「SIP-BYE」の間にないために破棄する必要のあるデータがある場合は、それも処理する必要があります。

詳細については、global windowsおよびtriggersのドキュメント、JavaDocsの[Trigger][3]、およびblog postのマニュアルを参照してください。

+0

ありがとう:)これは非常に役に立ちました! –

関連する問題