2017-04-14 9 views
5

カフカストリームで手動でコミットする方法はありますか?私は手動でコミット呼んでいるカフカストリームで手作業でコミットするには?

while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(100); 
    for (ConsumerRecord<String, String> record : records){ 
     // process records 
    } 
    consumer.commitAsync(); 
} 

通常KafkaConsumerを使用して、私は以下のような何かを行います。 KStreamの同様のAPIは表示されません。

答えて

12

コミットはStreamsによって内部的に完全に自動処理されるため、通常は手動でコミットする必要はありません。実際には、自動コミットは内部的に使用されるコンシューマに対しては無効であり、Streamsは「手動で」コミットを管理します。その理由は、処理中の特定のポイントでのみコミットが発生し、データが失われないようにするためです(状態の更新と結果のフラッシュに関して多くの内部依存関係が存在するため)。

より頻繁にコミットするには、StreamsConfigパラメータcommit.interval.msを使用してコミット間隔を減らすことができます。

ただし、低レバープロセッサAPIを使用して間接的に手動コミットが可能です。 init()メソッドを介して提供されるcontextオブジェクトを使用してcontext#commit()を呼び出すことができます。これは、できるだけ早くコミットするための「ストリームへのリクエスト」であり、直接コミットを発行していないことに注意してください。

関連する問題