2017-09-26 14 views
3

flink kafkaのコンシューマとストリーム(kafka msgsがトピックにストリーミングされている)を持っていて、面白い振る舞いに気づいています。Flink:ストリームの最後にウィンドウが処理されない

データがストリーミングされているときに、ウィンドウが「完了」する前に停止した場合、またはデータが終了して(数回のウィンドウ後に)ウィンドウの最後に到達しない場合、残りのパイプラインはトリガーしないでください。

例の流れ:

env.addSource(kafkaConsumer) 
     .flatMap(new TokenMapper()) 
     .keyBy("word") 
     .window(TumblingEventTimeWindows.of(Time.seconds(10L))) 
     .reduce(new CountTokens()) 
     .flatMap(new ConvertToString()) 
     .addSink(producer); 

私はTimeCharacteristicがイベント時刻に設定するENVでFlinkKafkaConsumer010を使用しています。そしてconsumer.assignTimestampsAndWatermarks(新しいPeriodicWatermarks())

private static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<String>{ 

    private long currentMaxTimestamp; 
    private final long maxOutOfOrderness; 

    public PeriodicWatermarksAuto(long maxOutOfOrderness){ 
     this.maxOutOfOrderness = maxOutOfOrderness; 
    } 

    @Override 
    public Watermark getCurrentWatermark() { 
     return new Watermark(currentMaxTimestamp - maxOutOfOrderness); 
    } 

    @Override 
    public long extractTimestamp(String t, long l) { 
     // this should be the event timestamp 
     currentMaxTimestamp = l; 
     logger.info("TIMESTAMP: " + l); 
     return l; 
    } 
} 

私のウィンドウが10秒を言っている、と私のデータストリームは、データのみの8秒が含まれています(その後、しばらくの間ストリーミングを停止した)場合は、flatMap-新後データにストリーミングされるまで>シンクは、処理しない

例データストリーム処理問題:例えば、私は35であった。同様場合

 xxxxxxxx(8secs)------(gap)--(later more data)xxxxx 
     ^(not processed)   (until I get here)^ 

(各Xは秒 当たりのデータの一部です)秒ストリーミングdの価値ata(と私のウィンドウは10秒です)、データトリガーは3ウィンドウ分、残りの5秒分のデータは処理されません。

ついに
 ...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx 
     (processed)  ^(not processed)   (until I get here)^ 

私の窓は10秒であり、私は唯一のはflatmap->が起こることはありませんシンクストリームデータの5秒を持って場合。

私の質問は、ですが、しばらくしてからデータが表示されない場合、処理するウィンドウデータをトリガーする方法はありますか?

私のデータがライブストリーミングされている場合は、データが伸びていないことがわかります。最後のウィンドウ(わずか5秒分のデータ)が不確定なものを待たなければならない新しいデータが入るまでは、ウィンドウ時間が経過した後の最後のウィンドウの結果が必要になります。

これはProcessingTimeの代わりにEventTimeを使用しているか、ウォーターマークが実際にトリガーする最後のウィンドウに対して適切に生成されていないことが原因と考えられます。ストリームが最後のビットをトリガーしない場合、これは誰にとっても問題になると思います。私はおそらくend-of-stream msgを送ることができると言いますが、ソースがストリームを壊すのでストリームが終了してもこれは役に立ちません。

EDIT:私は処理時間を変更し、最後のウィンドウでデータを正しく処理するので、EventTimeがafterallの原因であると思われます。カスタムトリガーまたは適切なウィンドウ透かしが答えかもしれません。 。

ありがとうございました!

答えて

2

ウォーターマークに関連して私が思ったように、これを後世に残しておきます。タイムスタンプとウォーターメーカー(assignTimestampsAndWatermarksから)は 'getCurrentWatermark()'を呼び出し、ウォーターマークを固定エンティティ(タイムスタンプ - 最大オフセット)に設定していたため、新しいエンティティが見つかるまで更新されませんでした。

私の解決策は、設定可能な時間内にデータが見られなかった場合、最終的にウォーターマークを次のウィンドウに進めるためのタイマーです。私は非常に潜在的なデータを処理することはできませんが、私はこれが問題であるとは思わない。これは、EventTime処理の意図された動作です。

+0

時間を処理するためにカスタムトリガーをウィンドウに追加することができます。 eventTimeウィンドウは、このような理由から、カスタム処理時間トリガーと組み合わせて使用​​されることがよくあります。 – Jicaar

+0

それは私が考えていたものです。私はそれを回避策で動作させましたが、私はカスタムトリガが最善であることに同意します。 –

関連する問題