2016-11-21 1 views
0

私はconsumer.poll()メソッドを使用しているときに問題に直面しています。poll()メソッドを使用してデータを取得した後、コンシューマにコミットするデータがないので、カフカのトピックから特定の行数を削除してください。kafkaトピックから特定の行数を削除するにはpythonを使用するか、inbuiltメソッドを使用しますか?

+0

あなたの質問を理解できません。ただし、カフカのトピックは追加のみで、手動では削除できません。データが削除される唯一の方法は、ログ保持またはログ圧縮です。 –

+0

@Matthias J. Saxにお返事ありがとうございます。しかし実際は私の問題は私がconsumer.poll()を使用している間です。特定の量のデータを取得しますが、プログラムが失敗した場合、新しいサーバはワードの最初の行から読み込みを開始し、コミット "がTrueの場合、1つのサーバが故障した場合にデータが失われます – surya

答えて

0

コンシューマに障害が発生した場合のデータ損失を避けるため、コミットする前にデータが完全に処理されていることを確認する必要があります。

あなたがauto.commitを有効にする場合は、次のpoll()を発行する前に、各poll()が暗黙のうちにその前のpoll()からすべてのデータをコミットしているためこのように、完全にpoll()後にすべてのデータを処理していることを確認してください。

これが可能でない場合は、consumer.commit(...)でデータが完全に処理された後にauto.commitを無効にし、手動でコミットする必要があります。このためには、各メッセージを個別にコミットする必要はなく、オフセットがXのコミットは、オフセットが< Xのすべてのメッセージを暗黙的にコミットすることを覚えておいてください(たとえば、オフセット5のメッセージを処理した後、最後に正常に処理されたメッセージではなく、処理したい次のメッセージです)。オフセット6をコミットすると、すべてのメッセージが0から5のオフセットでコミットされます。したがって、より小さいオフセットを持つすべてのメッセージが完全に処理される前に、オフセット6をコミットしないでください。

+0

ありがとうございます。私のpoll()メソッドは1000行を取り込み、消費者から1000行を自動的に削除します。別のサーバーが最初の500行を再度処理します。しかし、私の状況では、バケツを100行ごとに作成するようにバケツに分けているので、そのバケットの後に処理するように送ります。この場合、出力のデータは複製されます。 – surya

+0

はい。カフカは最低1回の処理しか保証しておらず、故障の場合は重複する可能性があります。まだ一度の処理はありません。 IIRCでは、これもここで議論されています:http://docs.confluent.io/current/clients/consumer.html –

+0

Btw:あなたは何も "削除"していません...コミットした後、データはまだカフカにあります。対応するオフセットに対して 'seek()'を実行した場合には、再度読み込むことができます。 –

関連する問題