私はキューとしてKafka 1.0を使用するアプリケーションを持っています。カフカのトピックには80のパーティションと80のコンシューマが走っています。 (Kafka-pythonの消費者)。コマンドを実行することによりカフカパーティションラグを増やす
:
私はパーティションの1つがオフセットで立ち往生し、新しいレコードが追加されているようタイムラグが連続的に増加していることを見./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
。
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST
118 mytopic 37 1924 2782 858 kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic 38 2741 2742 1 kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic 39 2713 2713 0 kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic 40 2687 2688 1 kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost
は何がこれを原因:
上記のコマンドの出力は次のようになりますか?また、reset-offsetsコマンドを使用してオフセットをリセットすることは、このサーバーが定期的に手動で監視されない可能性があるため、望ましくありません。
クライアントは、LinuxのM/Cに並列プロセスとしてバックグラウンドで実行します。
consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
msg = json.loads(message.value)
process_message(msg)
コンシューマは、バックグラウンドで実行されているkafka-pythonクライアントです。それが突然停止する理由。私はクライアントのインスタンスの数を確認し、それはうまく感じます。消費者を再起動することも問題を解決していないようです。 – ashdnik
質問自体にクライアントコードを追加しました – ashdnik