私はconsumer.poll()メソッドを使用しているときに問題に直面しています。poll()メソッドを使用してデータを取得した後、コンシューマにコミットするデータがないので、カフカのトピックから特定の行数を削除してください。kafkaトピックから特定の行数を削除するにはpythonを使用するか、inbuiltメソッドを使用しますか?
答えて
コンシューマに障害が発生した場合のデータ損失を避けるため、コミットする前にデータが完全に処理されていることを確認する必要があります。
あなたがauto.commit
を有効にする場合は、次のpoll()
を発行する前に、各poll()
が暗黙のうちにその前のpoll()
からすべてのデータをコミットしているためこのように、完全にpoll()
後にすべてのデータを処理していることを確認してください。
これが可能でない場合は、consumer.commit(...)
でデータが完全に処理された後にauto.commit
を無効にし、手動でコミットする必要があります。このためには、各メッセージを個別にコミットする必要はなく、オフセットがX
のコミットは、オフセットが< X
のすべてのメッセージを暗黙的にコミットすることを覚えておいてください(たとえば、オフセット5のメッセージを処理した後、最後に正常に処理されたメッセージではなく、処理したい次のメッセージです)。オフセット6をコミットすると、すべてのメッセージが0から5のオフセットでコミットされます。したがって、より小さいオフセットを持つすべてのメッセージが完全に処理される前に、オフセット6をコミットしないでください。
ありがとうございます。私のpoll()メソッドは1000行を取り込み、消費者から1000行を自動的に削除します。別のサーバーが最初の500行を再度処理します。しかし、私の状況では、バケツを100行ごとに作成するようにバケツに分けているので、そのバケットの後に処理するように送ります。この場合、出力のデータは複製されます。 – surya
はい。カフカは最低1回の処理しか保証しておらず、故障の場合は重複する可能性があります。まだ一度の処理はありません。 IIRCでは、これもここで議論されています:http://docs.confluent.io/current/clients/consumer.html –
Btw:あなたは何も "削除"していません...コミットした後、データはまだカフカにあります。対応するオフセットに対して 'seek()'を実行した場合には、再度読み込むことができます。 –
- 1. Kafka REST Proxyを使用してKafkaトピックを削除するには?
- 2. Kafka:保持 "compact"のトピックからメッセージを削除する
- 3. System.IO.Deleteを使用してディレクトリから特定のファイルを削除しますか?
- 4. sqlから特定の行を削除する(sqliteでAndroidを使用)
- 5. jQueryを使用して特定の行を削除する
- 6. バッチファイルを使用して特定の行と前の行をテキストファイルから削除しますか?
- 7. Pythonを使用して引用キーに基づいてbibtexファイルから特定のエントリを削除する
- 8. TSQLを使用して特定の値のXMLをノードから削除する
- 9. JQueryを使用して特定のテーブルから列を削除します
- 10. Javaを使用してKafkaからACLSを削除する方法
- 11. Python。どのメッセージをキュー/トピックから削除するかActiveMQ
- 12. ボタンを使用して特定の行にはない行を削除する
- 13. どちらが良いですか? inbuiltのpython関数またはos.systemコマンドを使用していますか?
- 14. 未使用のメソッドをxcode iosから削除する
- 15. zookeeper-shall.sh rmrブローカー/トピックを使用したトピックの削除とkafka10のkafka-topics.shのフラグの削除
- 16. ユニットテストを使用してソースクラスから特定のメソッドを実行
- 17. laravelを使用してテーブルから行を削除する
- 18. スウィフトを使用してテキストファイルから1行を削除する
- 19. Sedを使用してテキストファイルから行を削除する
- 20. ListView(WPF)を使用してデータベースから行を削除する
- 21. チェックボックスを使用してデータベーステーブルから行を削除する
- 22. javascriptを使用してテーブルから行を削除する
- 23. jpaを使用してデータベースから行を削除する
- 24. PHPを使用してSQLテーブルから行を削除する
- 25. awkを使用してファイルから特定の列を削除する方法はありますか?
- 26. kafka-nodeのトピックを削除する方法はありますか
- 27. リストのPythonを使用して文字列から数値を削除する
- 28. ワイルドカードを使用してLinuxの行から特定のパターンを削除するには
- 29. OpenXMLを使用してExcelから数式を削除する
- 30. Pythonを使用してcsvファイルから空行を削除する
あなたの質問を理解できません。ただし、カフカのトピックは追加のみで、手動では削除できません。データが削除される唯一の方法は、ログ保持またはログ圧縮です。 –
@Matthias J. Saxにお返事ありがとうございます。しかし実際は私の問題は私がconsumer.poll()を使用している間です。特定の量のデータを取得しますが、プログラムが失敗した場合、新しいサーバはワードの最初の行から読み込みを開始し、コミット "がTrueの場合、1つのサーバが故障した場合にデータが失われます – surya