2017-03-21 11 views
0

私はKafka 0.10.2.0を使用しています。私には3人のブローカーがいて、フェールオーバーテストをしています。ときどきカフカブローカーの1人が堂々と閉じられたとき、私はデータを失っています。 カフカのブローカ構成:カフカブローカーでデータを失うのを防ぐ方法

zookeeper.connection.timeout.ms=6000 
num.partitions=50 
min.insync.replicas=2 
unclean.leader.election.enable=false 
group.max.session.timeout.ms=10000 
group.min.session.timeout.ms=1000 

消費者の設定:

props.put(ConsumerConfig.GROUP_ID_CONFIG, getTopicName() + "group"); 
props.put(ConsumerConfig.CLIENT_ID_CONFIG, getClientId()); 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); 
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500); 
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 3000); 

プロデューサーの構成:

props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
props.put(ProducerConfig.CLIENT_ID_CONFIG, getClientId()); 
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 800); 
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 800); 

私はカフカブローカーを介してデータを失う停止するように何ができますか?

+0

のACK = ALL'を参照してくださいhttp://kafka.apache.org/documentation/#producerconfigsの 'configセットプロデューサーであります –

+0

私はすでにプロデューサーにacks allを追加しました。 – melihcoskun

+0

不正なリーダー選挙が無効になっていて、acks = allの場合、データを失ってはいけません。このシナリオを確実に再現できる場合は、潜在的なバグを[email protected](http://kafka.apache.org/contact)に報告するか、Jiraを開いてください:https://issues.apache.org/jira/browse/KAFKA-1?jql = project%20%3D%20KAFKA –

答えて

1

あなたのトピックに3の複製係数がありますか?

いくつかの素晴らしいアドバイス最後の年からカフカサミットhttps://www.slideshare.net/ConfluentInc/when-it-absolutely-positively-has-to-be-there-reliability-guarantees-in-kafka-gwen-shapira-jeff-holoman?from_m_app=ios

セッションのビデオはここhttps://www.confluent.io/kafka-summit-2016-ops-when-it-absolutely-positively-has-to-be-there/

+0

はい、すべてのトピックにレプリケーションファクタ3があります。私はautocommit falseを使用していますが、私はメッセージをコミットする上で何の問題もありません。私はプロデューサーがメッセージをトピックに正しく送信していると確信しています。しかし、その後ブローカーがダウンしたとき、私はデータを失っています。それはいつも起こっているのではなく、次のようなテストをするときに:カフカにメッセージを送信し、ブローカーの1つを閉じてからもう一度開いて、他のブローカーを閉じてください...そのケースに関するデータが失われています。 – melihcoskun

+0

データが失われていることをどのように知っていますか?コンソールからコンシューマを使用して最初からのメッセージを消費する場合、Kafkaログにはないデータがありますか?元のブローカに戻ってもメッセージはまだそこにありますか? –

+0

私は最初からオフセットを開始していません。私はcommitSync.Strategy = earliestで手動コミットを使用しています。ログにはすでにブローカーに配信されたメッセージが表示されるため、データが失われていることがわかります。だから私はカフカの話題についてのメッセージを持っていて、カフカブローカーをもう一度始めました。私がポールメソッドを呼び出すときに消費者側では、いくつかのメッセージがスキップされているのがわかります。オフセット位置が変更された可能性がありますが、どうすればよいですか? – melihcoskun

関連する問題