2017-11-15 20 views
0

私はキューとしてKafka 1.0を使用するアプリケーションを持っています。カフカのトピックには80のパーティションと80のコンシューマが走っています。 (Kafka-pythonの消費者)。コマンドを実行することによりカフカパーティションラグを増やす

私はパーティションの1つがオフセットで立ち往生し、新しいレコードが追加されているようタイムラグが連続的に増加していることを見
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe 

TOPIC       PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG  CONSUMER-ID          HOST 

118 mytopic      37   1924   2782   858  kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost 
119 mytopic      38   2741   2742   1   kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost 
120 mytopic      39   2713   2713   0   kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost 
121 mytopic      40   2687   2688   1   kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost 

は何がこれを原因:

上記のコマンドの出力は次のようになりますか?また、reset-offsetsコマンドを使用してオフセットをリセットすることは、このサーバーが定期的に手動で監視されない可能性があるため、望ましくありません。

クライアントは、LinuxのM/Cに並列プロセスとしてバックグラウンドで実行します。

consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092', 
        session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1, 
        auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024, 
        value_deserializer=lambda m: json.loads(m.decode('ascii'))) 

for message in consumer: 
    msg = json.loads(message.value) 
    process_message(msg) 

答えて

0

オフセット消費者がいくつかの時間後に移動されていない場合、消費者は 停止している可能性があります。消費者オフセットが移動しているが、消費者の遅れが (ログの終わりと消費者のオフセットの差)が である場合、消費者は生産者よりも遅い。コンシューマ が遅い場合、一般的な解決策はコンシューマの並列度を高めることです。これは、トピックのパーティション数を増やす必要があります。

続きを読むKafka docs

簡単に言えば、あなたは消費している以上のものを生産しています。遅れを減らすには、消費率を上げる必要があります。より多くの消費者を追加する必要があります。あなたがテストしているだけなら、あなたの消費者は遅いです。

+0

コンシューマは、バックグラウンドで実行されているkafka-pythonクライアントです。それが突然停止する理由。私はクライアントのインスタンスの数を確認し、それはうまく感じます。消費者を再起動することも問題を解決していないようです。 – ashdnik

+0

質問自体にクライアントコードを追加しました – ashdnik