AWS SQSの代わりにkafkaを試しています。主な動機は、kafkaが一度に10メッセージを引き出すという制約を256kbの上限でなくすパフォーマンスを向上させることです。ここでは、私のユースケースの高レベルのシナリオを示します。私は索引作成のための文書を送るクローラをたくさん持っています。ペイロードのサイズは平均で約1MBです。クローラはSOAPエンドポイントを呼び出し、プロデューサコードを実行してメッセージをカフカキューに送信します。コンシューマアプリはメッセージをピックアップして処理します。私のテストボックスでは、2つの複製を持つ30個のパーティションでトピックを構成しました。 2つのカフカインスタンスは1つの飼い猫インスタンスで実行されています。カフカのバージョンは0.10.0です。カフカのコンシューマ設定/パフォーマンスの問題
私のテストでは、700万のメッセージをキューに入れました。私は30個のコンシューマスレッドを持つコンシューマグループを作成しました。私は当初、これがSQS経由で取得したものに比べて処理能力を大幅に向上させるという印象を受けました。残念ながら、そうではありませんでした。私の場合、データの処理は複雑で、完了するまで平均で1〜2分かかります。スレッドが時間通りにハートビートすることができないため、パーティションの再調整が必要になります。グループがすでにリバランスし、他のメンバーに パーティションが割り当てられているので を完了することができないコミット:私はオートグループfull_groupのために失敗したコミットオフセット
を引用し、ログ内のメッセージの束を見ることができました。つまり、 以降のpoll()呼び出しは、コンフィグレーションされた session.timeout.msよりも長くなりました。これは通常、ポーリングループが のメッセージ処理に時間がかかっていることを意味します。セッションタイムアウトを増やすか、max.poll.recordsを使用してpoll()で返された バッチの最大サイズを減らすことで、 のいずれかに対処できます。
これは、同じメッセージが複数回処理される原因となります。これを避けるために、セッションタイムアウト、max.poll.records、およびポーリング時間を試してみましたが、全体的な処理の時間が遅くなりました。ここに設定パラメータのいくつかがあります。
metadata.max.age.ms = 300000
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
enable.auto.commit = true
max.poll.records = 10000
request.timeout.ms = 310000
heartbeat.interval.ms = 100000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
fetch.max.wait.ms = 500
connections.max.idle.ms = 540000
session.timeout.ms = 300000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
metrics.sample.window.ms = 30000
auto.offset.reset = latest
私は消費者ポーリング時間を100ミリ秒に減らしました。これは、再調整の問題を軽減し、重複した処理を排除しましたが、全体のプロセスを大幅に遅らせました。 SQSベースのソリューションを使用して25時間に比べて6百万のメッセージすべてを処理するのに35時間かかることになりました。各消費者スレッドは平均して50〜60件のメッセージを収集しましたが、そのうちのいくつかは時々0レコードをポーリングしました。私はパーティション内で利用可能な膨大な量のメッセージがある場合、この動作についてはわかりません。同じスレッドが後続の反復中にメッセージを受け取ることができました。これはリバランスのためでしょうか?
私の消費者コードです
while (true) {
try{
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
if(record.value()!=null){
TextAnalysisRequest textAnalysisObj = record.value();
if(textAnalysisObj!=null){
// Process record
PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
}
}
}
}catch(Exception ex){
LOGGER.error("Error in Full Consumer group worker", ex);
}
私の場合はレコード処理部分が一つのボトルネックであることをご理解ください。しかし、私は、ここ数人が大きな処理時間を扱うのに似たユースケースを持っていると確信しています。私は、専用のスレッドで各プロセッサを回転させたり、大容量のスレッドプールを使用して非同期処理を行うことを考えましたが、システムに大きな負荷がかかるかどうかはわかりません。同時に、私は、人々が再バランスの問題を避けるために処理を実行するために一時停止と再開APIを使用した2つのインスタンスを見てきました。
私は実際にこの状況でいくつかのアドバイス/ベストプラクティスを探しています。特に、kafkaが私のユースケースに適したツールでない場合は、ヒアビート、リクエストタイムアウト、最大ポーリングレコード、自動コミット間隔、ポーリング間隔などの設定をお勧めします。