2017-07-17 19 views
0

適切なウィンドウ関数/アサイナを選択できません。タスクは次のとおりです。まず、request_idといくつかのデータを持つSourceからデータを取得し、外部データベースに対して非同期要求を実行します。Apache Flinkの非同期リクエストとウィンドウ

// Here String is for request_id, Data is for treated data 
DataStream Tuple2<String, Data> stream = ... 

// async I/O queries 
DataStream<Tuple2<String, String>> resultStream = 
AsyncDataStream.unorderedWait(
    stream, 
    new AsyncDatabaseRequest(), 
    1000, 
    TimeUnit.MILLISECONDS, 
    100 
); 

ここで、すべてのデータをrequest_idで収集して計算します。

DataStream Tuple2<String, Integer> = result 
    .map(val -> new Tuple2<String, Integer>(val.f0, val.f1.data_int)) 
    .keyBy(0) 
    .window(...) 
    .sum(1); 

問題はウィンドウ機能です。私は各ウィンドウが同じrequest_idを持つすべてのデータポイントを構成する必要がありますが、非同期クエリの時間はミリ秒から分に変わることがあります。一方、私は低い待ち時間が必要なので、ProcessingTimeSessionWindows.withGap(Time.minutes(10))を使用することはできません。非同期関数から最後のデータを取得するとすぐに計算を実行する必要があります。

私にとって最良ののは、非同期関数のウィンドウ透かしを使用することです。非同期関数は、各クエリがいつ終了し、どのマッハポイントがそれを保持するかを確かに知っています。これは可能ですか?そのような作業のベストプラクティスは何ですか?

答えて

0

まあ、私は解決策を見つけました。それは非常に簡単です。 私はEventTimeを使用します。私はタイムアウトを避けるこのように

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
DataStream<...> dataStream = ...; 

DataStream<...> newStream = dataStream 
    .keyBy(0) 
    .timeWindow(Time.milliseconds(1)) 
    .reduce(new Reducer()); 

と結果はすぐに準備:ストリームの流れで

Long ts = System.currentTimeMillis(); 
ctx.collectWithTimestamp(data, ts); 
ctx.emitWatermark(new Watermark(ts + 1)); 

私はイベント時刻の関数を使用します。私のソース関数では、私は次のようにイベントが透かしだけでなく、タイムスタンプを生成します。

関連する問題