2017-02-02 11 views
0

Flinkストリーミングワークフローは、Kafkaにメッセージを公開します。 KafkaProducerの「再試行」メカニズムは、内部バッファーにメッセージが追加されるまで開始されません。Fink:KafkaProducerデータ損失

その前に例外がある場合、KafkaProducerはその例外をスローし、Flinkはそれを処理していないようです。この場合、データが失われます。

関連FLINKコード(FlinkKafkaProducerBase):

if (logFailuresOnly) { 
      callback = new Callback() { 
       @Override 
       public void onCompletion(RecordMetadata metadata, Exception e) { 
        if (e != null) { 
         LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); 
        } 
        acknowledgeMessage(); 
       } 
      }; 
     } 
     else { 
      callback = new Callback() { 
       @Override 
       public void onCompletion(RecordMetadata metadata, Exception exception) { 
        if (exception != null && asyncException == null) { 
         asyncException = exception; 
        } 
        acknowledgeMessage(); 
       } 
      }; 
     } 

ここでは、シナリオの我々は、データ損失の原因となることを認識しているされています

  1. すべてカフカブローカが停止しました。

    この場合、メッセージをバッファに追加する前に、KafkaProducerはメタデータを取得しようとします。設定されたタイムアウトでKafkaProducerがメタデータを取得できない場合は、例外がスローされます。 (カフカ0.9.0.1ライブラリ内の既存のバグ)書き込み可能ではない

  2. - メモリレコードの両方上記の例で

https://issues.apache.org/jira/browse/KAFKA-3594

、KafkaProducerは再試行しません、とFLINKは、メッセージを無視します。メッセージは記録されません。例外はありますが、失敗したメッセージはありません。

回避策(カフカ設定):

  1. メタデータタイムアウトのための非常に高い値(metadata.fetch.timeout.ms)
  2. バッファ満了のための非常に高い値(request.timeout.ms)

上記のカフカ設定を変更することによる副作用については、引き続き調査中です。

私たちの理解は正しいですか?または、いくつかのFlink設定を変更してこのデータ消失を回避できる方法はありますか?

ありがとうございました。

答えて

0

ここで私はあなたの質問を考えています。 カフカの一つは、最初の保証を参照してください:複製因子Nとの話題については

は、我々はログにコミットすべてのレコードを失うことなく、N-1サーバーの障害まで耐えます。

まず、ログにコミットされたメッセージまたはレコードが気になります。配信されなかったレコードは、コミットされたものとして見なされません。第二に、すべてのブローカーがダウンした場合、データが失われます。

  • block.on.buffer:以下

    設定は、私たちが生産者側のデータの損失を防ぐために使用するものです。真=フル

  • のACK =の代わりに送信するすべての
  • リトライ= MAX_VALUE
  • max.in.flight.requests.per.connection = 1
  • 使用KafkaProducer.send(レコード、コールバック)(レコード)
  • unclean.leader.election.enable = falseを
  • replication.factor>はmin.insync.replicas
  • min.insync.replicas> 1
+0

こんにちはアメジストIC。お返事をありがとうございます。ええ、私たちは上記のすべての設定を持っています。したがって、すべてのブローカーがダウンしても、データが失われることはありません。これは実際にFlinkのバグです。以下を確認してください:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html – Ninad

+0

は、 "async (バージョン0.9未満)のパブリッシャーは、コールバック(ネットワーク、バッチ期限切れなど)のすべての失敗を捕まえることはできません。 – kisna

+0

これらの設定は新しいプロデューサーの設定です。 – amethystic