2016-09-14 18 views
0

私は順序が乱れたストリームを持っています。それらを順序付けし、次のフレームの同じフィールドでフィールド値を合計する必要があります。私のコード:Flinkタイムスタンプ、計算されたフィールドをストリームに追加する

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
DataStream<Message> messageswithTS = messages.assignTimestampsAndWatermarks(new TimeLagWaterMarkGenerator()); 
DataStream<Message> SumNumber = messageswithTS 
      .keyBy("deviceId") 
      .map(new Sumalo()) 

Sumalo()は、追加する機能です。タイムスタンプを抽出するコード:

public class TimeLagWaterMarkGenerator implements AssignerWithPeriodicWatermarks<Message> { 

private static final long serialVersionUID = 1L; 
private long currentMaxTimestamp; 

@Override 
public long extractTimestamp(Message element, long previousElementTimestamp) { 
    long timestamp = element.getDate(); 
    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); 
    return timestamp; 
} 

@Override 
public Watermark getCurrentWatermark() { 
    return new Watermark(currentMaxTimestamp); 
} 
} 

結果:

1 TRUE 0 21 1473861657491 6af7ecfb-5122-48b6-ada1-0ea39d1d4740 
1 FALSE 3 3 1473861657496 c8b4617d-534b-4c5e-825c-a8c5556fcd87 
1 TRUE 1 29 1473861657497 f5b72056-ec3d-4c97-b86d-73ed728757c3 
1 FALSE 0 29 1473861657501 363d061d-ce02-4709-9683-b3bb233861f3 
.... 

正しい結果:

1 TRUE 0 0 1473861657491 6af7ecfb-5122-48b6-ada1-0ea39d1d4740 
1 FALSE 3 3 1473861657496 c8b4617d-534b-4c5e-825c-a8c5556fcd87 
1 TRUE 1 4 1473861657497 f5b72056-ec3d-4c97-b86d-73ed728757c3 
1 FALSE 0 4 1473861657501 363d061d-ce02-4709-9683-b3bb233861f3 
.... 

は、任意のヘルプは理解されます。

答えて

1

FLINK はない自動的にイベント時間にイベント・タイム・ストリームを並べ替えず、またがどのイベント時にのみ可能だろう、すなわち、に(イベント時の流れをソートする演算子を提供していません順不同ストリームを順ストリームに変換する)。

ただし、AbstractStreamOperatorを拡張することで、このような演算子を自分で実装することができます。これは、イベント、割り当てられたタイムスタンプ、および受信されたウォーターマークにアクセスできる低レベルのインターフェイスです。オペレータは次のように動作することができます。これは、到着するすべての要素をイベント時間でソートされたヒープに挿入できます。ウォーターマークが到着すると、ウォーターマークよりも少ないタイムスタンプを持つすべての要素が放出されます。遅延要素が到着した場合(タイムスタンプが現在の透かしよりも小さい要素)、その要素を放出する(ストリームの完全な順序を破棄する)か、廃棄することができます。また、オペレータはヒープをFlink管理状態として保持することによってチェックポイントに参加する必要があります。このインターフェイスは非常に低レベルなので、Flinkの動作をよく理解している必要があります。さらに、マイナーバージョン間で変更される可能性があります。

タイムスタンプとウォーターマークアサイナに関しては、透かしに余裕を追加していません。この実装では、おそらく多くの遅れ要素があります。 BoundedOutOfOrdernessTimestampExtractorをご覧ください。

+0

Fabianさん、ありがとうございました。私はテーブルAPIとSQLの機能でそれを試しましたが、ストリームテーブルの集約は現在サポートされていません... – jag

+0

いいえ残念ながら、それは現時点でそれを実装する唯一の方法です。この機能は将来、テーブルAPIに追加されます。 –

関連する問題