私はFlink 1.2-Snapshotを使用しています。Flinkストリーミングウィンドウ - 各ウィンドウの最後のイベントは次のウィンドウに属します
- ID = 25398102、ソースID = 1、TS = 2016年10月15日午前〇時00分56秒、ユーザー= 14、値= 919
- ID = 25398185、ソースID =:私のデータは以下のようになります。 1、ts = 2016-10-15 00:01:06、user = 14、value = 920
- id = 25398210、sourceId = 1、ts = 2016-10-15 00:01:16、user = 14、値= 944
- ID = 25398235、ソースID = 1、TS = 2016年10月15日午前0時01分24秒、ユーザー= 3149、値= 944
- ID = 25398236、ソースID = 1、TS = 2016から10 -15 00:01:25、ユーザー= 71、値= 955
- id = 25398239、sour ts = 2016-10-15 00:01:36、user = 71、value = 955
- id = 25398265、sourceId = 1、ts = 2016-10-15 00:01:36、user = 71、値= 955
- ID = 25398310、ソースID = 1、TS = 2016年10月15日午後12時02分16秒、ユーザー= 14、値= 960
- ID = 25398320、ソースID = 1、TS = 2016 -10-15午後12時02分26秒、ユーザー= 14、値= 1000
私はWindowsベースのユーザーIDを作成するには、次のコードを実行しています:
stream.flatMap(new LogsParser())
.assignTimestampsAndWatermarks(new MessageTimestampExtractor())
.keyBy("sourceId")
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(new MySessionTrigger()))
.apply(new SessionWindowFunction())
.print();
MySessionトリガーはRECEに見えますユーザーIDを確認して、ユーザーIDの変更時にウィンドウをトリガーします。 SessionWindowFunctionは、ウィンドウからセッションを作成するだけです。
ここで作成されたセッションである:
セッション:
- ID = 25398102、ソースID = 1、TS = 2016年10月15日午後12時00分56秒、ユーザー= 14、値= 919
- ID = 25398185、ソースID = 1、TS = 2016年10月15日午後12時01分06秒、ユーザー= 14、値= 920
- ID = 25398210、ソースID = 1、TS = 2016から10 -15 00:01:16、ユーザー= 14、値= 944
- ID = 25398235、ソースID = 1、TS = 2016年10月15日0時01分24秒、ユーザー= 3149、値= 944
セッション:
- ID = 25398236、ソースID = 1、ts = 2016-10-15 00:01:25、user = 71、value = 955
- id = 25398239、sourceId = 1、ts = 2016-10-15 00:01:26、user = 71、値= 955
- ID = 25398265、ソースID = 1、TS = 2016年10月15日午前0時01分36秒、ユーザー= 71、値= 955
- ID = 25398310、ソースID = 1、TS = 2016から10 -15 00:02:16、ユーザー= 14、値= 960
セッション:
- ID = 25398320、ソースID = 1、TS = 2016年10月15日午前0時02分26秒、ユーザー= 14、値= 1000
問題は、すべてのセッションで最後のイベントが実際に次のウィンドウに属していることがわかります。最後のイベントが既にウィンドウに表示されているため、ウィンドウをトリガーする決定はやや遅れています。
このウィンドウの最後のイベントを考慮せずにウィンドウをトリガーするにはどうすればよいですか?