Flinkストリーミングワークフローは、Kafkaにメッセージを公開します。 KafkaProducerの「再試行」メカニズムは、内部バッファーにメッセージが追加されるまで開始されません。Fink:KafkaProducerデータ損失
その前に例外がある場合、KafkaProducerはその例外をスローし、Flinkはそれを処理していないようです。この場合、データが失われます。
関連FLINKコード(FlinkKafkaProducerBase):
if (logFailuresOnly) {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
}
acknowledgeMessage();
}
};
}
else {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
}
acknowledgeMessage();
}
};
}
ここでは、シナリオの我々は、データ損失の原因となることを認識しているされています
すべてカフカブローカが停止しました。
この場合、メッセージをバッファに追加する前に、KafkaProducerはメタデータを取得しようとします。設定されたタイムアウトでKafkaProducerがメタデータを取得できない場合は、例外がスローされます。 (カフカ0.9.0.1ライブラリ内の既存のバグ)書き込み可能ではない
- メモリレコードの両方上記の例で
https://issues.apache.org/jira/browse/KAFKA-3594
、KafkaProducerは再試行しません、とFLINKは、メッセージを無視します。メッセージは記録されません。例外はありますが、失敗したメッセージはありません。
回避策(カフカ設定):
- メタデータタイムアウトのための非常に高い値(metadata.fetch.timeout.ms)
- バッファ満了のための非常に高い値(request.timeout.ms)
上記のカフカ設定を変更することによる副作用については、引き続き調査中です。
私たちの理解は正しいですか?または、いくつかのFlink設定を変更してこのデータ消失を回避できる方法はありますか?
ありがとうございました。
こんにちはアメジストIC。お返事をありがとうございます。ええ、私たちは上記のすべての設定を持っています。したがって、すべてのブローカーがダウンしても、データが失われることはありません。これは実際にFlinkのバグです。以下を確認してください:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html – Ninad
は、 "async (バージョン0.9未満)のパブリッシャーは、コールバック(ネットワーク、バッチ期限切れなど)のすべての失敗を捕まえることはできません。 – kisna
これらの設定は新しいプロデューサーの設定です。 – amethystic