2017-09-19 9 views
0

のために出力を生成しないFlink 'timeWindow'操作 私は、出力ファイルをstdoutに送信しません。PopularPlacesFromKafkaのサンプルファイル

... 
// find popular places 
val popularSpots = rides 
    // match ride to grid cell and event type (start or end) 
    .map(new GridCellMatcher) 
    // partition by cell id and event type 
    .keyBy(k => k) 
    // build sliding window 
    .timeWindow(Time.minutes(15), Time.minutes(5)) 
    // count events in window 
    .apply{ (key: (Int, Boolean), window, vals, out: Collector[(Int, Long, Boolean, Int)]) => 
    out.collect((key._1, window.getEnd, key._2, vals.size)) 
    } 

// print result on stdout 
    popularSpots.print() 
... 

私は大丈夫、データがカフカから引き出されていることを確認しましたが、それは私が何も出力を得るない「timeWindow」操作を行うにしようとしたときに何かのようです。 'timeWindow'操作を削除すると、 'keyBy'データが出力されているのがわかります。私が行方不明になっていることは明らかですか?

答えて

0

誰もが同じ問題を抱えている場合は、これが私の問題でした。

私のkafkaトピックには複数のパーティションがありましたが、すべてのテストデータを1つのパーティション(0)に生成していました。何らかのデータを受信して​​しまい、演算子チェーンの下にウォーターマークを送信しない - ウィンドウ関数がデータを出力しなくなる(そのような状況ではProcessingTimeでうまく動作する理由もある)。フィードバックのための

https://issues.apache.org/jira/browse/FLINK-5479

0

ソースのスピードアップを適切に設定しましたか?デフォルトでは(スピードアップ係数なし)、ソースは元のデータをエミュレートします。つまり、元々生成されたのと同じレートでレコードを出力します。つまり、1分のデータを生成するのに1分かかります。

ウィンドウオペレータは、最後の15分間のデータを5分ごとに集計します。その結果、ウィンドウ演算子が最初の結果を生成するまで5分かかります。

スピードアップ係数を600に設定すると、1秒で10分のデータが得られます。

0

一般に、フリンクジョブが出力を生成しない可能性がある理由はいくつかありますが、非常に一般的な理由はウォーターマーク付けと関係があります。 Flinkのイベントタイムクロックは、現在のウォーターマークが進んだときにのみ進むので、ウォーターマークがなければ、イベントタイムウィンドウは決して起動しません。

Flinkトレーニングの場合は、タクシー乗り場から透かしが生成されます。しかし、代わりにKafkaソースを使用しているので、タイムスタンプ抽出器と透かし生成プログラムを実装してから、assignTimestampsAndWatermarksをストリームに呼び出す必要があります(documentation参照)。 A BoundedOutOfOrdernessTimestampExtractorここで、遅延はKafkaに書き込んだジョブによって構成された遅延とよく一致します。

+0

おかげで、私は参照しています同じコードは、あなたが言及しているものとはまだありません出力が含まれています。ここではそれについて、関連するJIRAのです。 – AJC