2016-11-06 5 views
0

私はFlink 1.2-Snapshotを使用しています。Flinkストリーミングウィンドウ - 各ウィンドウの最後のイベントは次のウィンドウに属します

  • ID = 25398102、ソースID = 1、TS = 2016年10月15日午前〇時00分56秒、ユーザー= 14、値= 919
  • ID = 25398185、ソースID =:私のデータは以下のようになります。 1、ts = 2016-10-15 00:01:06、user = 14、value = 920
  • id = 25398210、sourceId = 1、ts = 2016-10-15 00:01:16、user = 14、値= 944
  • ID = 25398235、ソースID = 1、TS = 2016年10月15日午前0時01分24秒、ユーザー= 3149、値= 944
  • ID = 25398236、ソースID = 1、TS = 2016から10 -15 00:01:25、ユーザー= 71、値= 955
  • id = 25398239、sour ts = 2016-10-15 00:01:36、user = 71、value = 955
  • id = 25398265、sourceId = 1、ts = 2016-10-15 00:01:36、user = 71、値= 955
  • ID = 25398310、ソースID = 1、TS = 2016年10月15日午後12時02分16秒、ユーザー= 14、値= 960
  • ID = 25398320、ソースID = 1、TS = 2016 -10-15午後12時02分26秒、ユーザー= 14、値= 1000

私はWindowsベースのユーザーIDを作成するには、次のコードを実行しています:

stream.flatMap(new LogsParser()) 
      .assignTimestampsAndWatermarks(new MessageTimestampExtractor()) 
      .keyBy("sourceId") 
      .window(GlobalWindows.create()) 
      .trigger(PurgingTrigger.of(new MySessionTrigger())) 
      .apply(new SessionWindowFunction()) 
      .print(); 

MySessionトリガーはRECEに見えますユーザーIDを確認して、ユーザーIDの変更時にウィンドウをトリガーします。 SessionWindowFunctionは、ウィンドウからセッションを作成するだけです。

ここで作成されたセッションである:

  1. セッション:

    • ID = 25398102、ソースID = 1、TS = 2016年10月15日午後12時00分56秒、ユーザー= 14、値= 919
    • ID = 25398185、ソースID = 1、TS = 2016年10月15日午後12時01分06秒、ユーザー= 14、値= 920
    • ID = 25398210、ソースID = 1、TS = 2016から10 -15 00:01:16、ユーザー= 14、値= 944
    • ID = 25398235、ソースID = 1、TS = 2016年10月15日0時01分24秒、ユーザー= 3149、値= 944
  2. セッション:

    • ID = 25398236、ソースID = 1、ts = 2016-10-15 00:01:25、user = 71、value = 955
    • id = 25398239、sourceId = 1、ts = 2016-10-15 00:01:26、user = 71、値= 955
    • ID = 25398265、ソースID = 1、TS = 2016年10月15日午前0時01分36秒、ユーザー= 71、値= 955
    • ID = 25398310、ソースID = 1、TS = 2016から10 -15 00:02:16、ユーザー= 14、値= 960
  3. セッション:

    • ID = 25398320、ソースID = 1、TS = 2016年10月15日午前0時02分26秒、ユーザー= 14、値= 1000

問題は、すべてのセッションで最後のイベントが実際に次のウィンドウに属していることがわかります。最後のイベントが既にウィンドウに表示されているため、ウィンドウをトリガーする決定はやや遅れています。

このウィンドウの最後のイベントを考慮せずにウィンドウをトリガーするにはどうすればよいですか?

答えて

0

ユーザーIDが変更されるたびにフラットマップを使用してストリームにマーカーを挿入する方法が考えられます。これらのマーカーのいずれかが表示されると、トリガー機能が起動し、セッションウィンドウ機能によってマーカーが除外されます。

関連する問題