簡単な処理でトピック間のストリーミングにKafka Streams v。0.10.2.0を使用しています。最近、ブローカーの1人がダウンしてカフカストリームアプリがシャットダウンし、手動で再起動するまで問題が発生しました。私は、まさにこの原因をログから理解することはできません。この問題をデバッグしようとすると、ここではログの抜粋は次のとおりです。すべての例外なくシャットダウンしているKafka Streamsアプリケーションの再起動
INFO [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [topicname-3, topicname-1, topicname-2] for group streams-group
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] partitions [[topicname-3, topicname-1, topicname-2]] revoked at the beginning of consumer rebalance.
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_1
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_3
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_1
INFO [kafka-coordinator-heartbeat-thread | streams-group] o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 127.0.0.1:9092 dead for group streams-group
INFO [kafka-coordinator-heartbeat-thread | streams-group] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator 127.0.0.1:9092 for group streams-group.
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_3
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Committing consumer offsets of task 0_1
ERROR [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Failed while executing StreamTask 0_1 due to commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Updating suspended tasks to contain active tasks [[0_1, 0_2, 0_3]]
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [[0_1, 0_2, 0_3]]
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [[]]
ERROR [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group streams-group failed on partition revocation
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group streams-group
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator dead for group streams-group
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator for group streams-group.
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group streams-group
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Constructed client metadata ...
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Assigned tasks to clients as {...=[activeTasks: ([0_0, 0_4]) assignedTasks: ([0_0, 0_4]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.2], ...=[activeTasks: ([0_1, 0_2, 0_3]) assignedTasks: ([0_1, 0_2, 0_3]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.30000000000000004]}.
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Successfully joined group streams-group with generation 17
INFO [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [topicname-3, topicname-1, topicname-2] for group streams-group
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] New partitions [[topicname-3, topicname-1, topicname-2]] assigned at the end of consumer rebalance.
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_1] Initializing processor nodes of the topology
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_2] Initializing processor nodes of the topology
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_3] Initializing processor nodes of the topology
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Shutting down
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_1
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_3
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_1
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_3
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_1
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_2
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_3
INFO [StreamThread-1] o.a.k.c.p.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [[0_1, 0_2, 0_3]]
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [[]]
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Stream thread shutdown complete
WARN [StreamThread-1] o.a.k.s.p.i.StreamThread - Unexpected state transition from RUNNING to NOT_RUNNING
まず、非常に簡単であるとするので、処理が長い時間を取っていたことは非常に考えにくいですアプリは数ヶ月間ログのようなメッセージなしで実行されていました。
ログから判断して、カフカストリームはグループに再結合しましたが、突然例外なくシャットダウンしました。私は別のマシン上で実行されている2つのストリームアプリを持っていて、ブローカーが再起動したときに同時に両方がシャットダウンされました。
この問題はどのようにデバッグできますか?少なくとも例外を投げてはいけませんか? もう1つの問題は、ストリームスレッドがシャットダウンしている間、残りのアプリケーションが正常に動作していたため、自動的に再起動されなかったことです。何とかこれをキャッチしてスレッドを再開できますか?保持ポリシーにより、消費者にとっては非常に望ましくないことがあります。どうすればkafkaストリームアプリをより信頼できるものにできますか?
ありがとうございます!