2016-10-06 3 views
4

AWS SQSの代わりにkafkaを試しています。主な動機は、kafkaが一度に10メッセージを引き出すという制約を256kbの上限でなくすパフォーマンスを向上させることです。ここでは、私のユースケースの高レベルのシナリオを示します。私は索引作成のための文書を送るクローラをたくさん持っています。ペイロードのサイズは平均で約1MBです。クローラはSOAPエンドポイントを呼び出し、プロデューサコードを実行してメッセージをカフカキューに送信します。コンシューマアプリはメッセージをピックアップして処理します。私のテストボックスでは、2つの複製を持つ30個のパーティションでトピックを構成しました。 2つのカフカインスタンスは1つの飼い猫インスタンスで実行されています。カフカのバージョンは0.10.0です。カフカのコンシューマ設定/パフォーマンスの問題

私のテストでは、700万のメッセージをキューに入れました。私は30個のコンシューマスレッドを持つコンシューマグループを作成しました。私は当初、これがSQS経由で取得したものに比べて処理能力を大幅に向上させるという印象を受けました。残念ながら、そうではありませんでした。私の場合、データの処理は複雑で、完了するまで平均で1〜2分かかります。スレッドが時間通りにハートビートすることができないため、パーティションの再調整が必要になります。グループがすでにリバランスし、他のメンバーに パーティションが割り当てられているので を完了することができないコミット:私はオートグループfull_groupのために失敗したコミットオフセット

を引用し、ログ内のメッセージの束を見ることができました。つまり、 以降のpoll()呼び出しは、コンフィグレーションされた session.timeout.msよりも長くなりました。これは通常、ポーリングループが のメッセージ処理に時間がかかっていることを意味します。セッションタイムアウトを増やすか、max.poll.recordsを使用してpoll()で返された バッチの最大サイズを減らすことで、 のいずれかに対処できます。

これは、同じメッセージが複数回処理される原因となります。これを避けるために、セッションタイムアウト、max.poll.records、およびポーリング時間を試してみましたが、全体的な処理の時間が遅くなりました。ここに設定パラメータのいくつかがあります。

 
metadata.max.age.ms = 300000 
max.partition.fetch.bytes = 1048576 
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] 
enable.auto.commit = true 
max.poll.records = 10000 
request.timeout.ms = 310000 
heartbeat.interval.ms = 100000 
auto.commit.interval.ms = 1000 
receive.buffer.bytes = 65536 
fetch.min.bytes = 1 
send.buffer.bytes = 131072 
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer 
group.id = full_group 
retry.backoff.ms = 100 
fetch.max.wait.ms = 500 
connections.max.idle.ms = 540000 
session.timeout.ms = 300000 
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
metrics.sample.window.ms = 30000 
auto.offset.reset = latest 
私は消費者ポーリング時間を100ミリ秒に減らしました。これは、再調整の問題を軽減し、重複した処理を排除しましたが、全体のプロセスを大幅に遅らせました。 SQSベースのソリューションを使用して25時間に比べて6百万のメッセージすべてを処理するのに35時間かかることになりました。各消費者スレッドは平均して50〜60件のメッセージを収集しましたが、そのうちのいくつかは時々0レコードをポーリングしました。私はパーティション内で利用可能な膨大な量のメッセージがある場合、この動作についてはわかりません。同じスレッドが後続の反復中にメッセージを受け取ることができました。これはリバランスのためでしょうか?

私の消費者コードです

 
while (true) { 
    try{ 
     ConsumerRecords records = consumer.poll(100); 
     for (ConsumerRecord record : records) { 
      if(record.value()!=null){ 
       TextAnalysisRequest textAnalysisObj = record.value(); 
       if(textAnalysisObj!=null){ 
        // Process record 
        PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); 
       } 
      } 
     } 
    }catch(Exception ex){ 
     LOGGER.error("Error in Full Consumer group worker", ex); 
    } 
私の場合はレコード処理部分が一つのボトルネックであることをご理解ください。しかし、私は、ここ数人が大きな処理時間を扱うのに似たユースケースを持っていると確信しています。私は、専用のスレッドで各プロセッサを回転させたり、大容量のスレッドプールを使用して非同期処理を行うことを考えましたが、システムに大きな負荷がかかるかどうかはわかりません。同時に、私は、人々が再バランスの問題を避けるために処理を実行するために一時停止と再開APIを使用した2つのインスタンスを見てきました。

私は実際にこの状況でいくつかのアドバイス/ベストプラクティスを探しています。特に、kafkaが私のユースケースに適したツールでない場合は、ヒアビート、リクエストタイムアウト、最大ポーリングレコード、自動コミット間隔、ポーリング間隔などの設定をお勧めします。

答えて

2

カフカから読み込むスレッドとは別のスレッドで、メッセージを非同期に処理することから開始できます。この方法で自動コミットは非常に高速になり、カフカはあなたのセッションを切断しません。このような何か:

private final BlockingQueue<TextAnalysisRequest> requests = 
new LinkedBlockingQueue(); 

読み取りスレッドで:

while (true) { 
    try{ 
     ConsumerRecords records = consumer.poll(100); 
     for (ConsumerRecord record : records) { 
      if(record.value()!=null){ 
       TextAnalysisRequest textAnalysisObj = record.value(); 
       if(textAnalysisObj!=null){ 
        // Process record 
        requests.offer(textAnalysisObj); 
       } 
      } 
    }  
} 
catch(Exception ex){ 
    LOGGER.error("Error in Full Consumer group worker", ex); 
} 

処理スレッドで:

  while (!Thread.currentThread().isInterrupted()) { 
       try { 
        TextAnalysisRequest textAnalysisObj = requests.take(); 
        PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); 
       } catch (InterruptedException e) { 
        LOGGER.info("Process thread interrupted", e); 
        Thread.currentThread().interrupt(); 
       } catch (Throwable t) { 
        LOGGER.warn("Unexpected throwable while processing.", t); 
       } 
      } 

が大きなメッセージを送信するための戦略のために、このドキュメントでも見てみましょうカフカを通して:http://blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/

要するに、カフカはbあなたが大きなメッセージを送信する必要がある場合は、それらをネットワークストレージに置き、カフカを介して自分の位置だけを送信するか、分割する方が良いでしょう。

関連する問題