2017-07-10 17 views
0

簡単な処理でトピック間のストリーミングにKafka Streams v。0.10.2.0を使用しています。最近、ブローカーの1人がダウンしてカフカストリームアプリがシャットダウンし、手動で再起動するまで問題が発生しました。私は、まさにこの原因をログから理解することはできません。この問題をデバッグしようとすると、ここではログの抜粋は次のとおりです。すべての例外なくシャットダウンしているKafka Streamsアプリケーションの再起動

INFO [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [topicname-3, topicname-1, topicname-2] for group streams-group 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] partitions [[topicname-3, topicname-1, topicname-2]] revoked at the beginning of consumer rebalance. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_1 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task's topology 0_3 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_1 
INFO [kafka-coordinator-heartbeat-thread | streams-group] o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 127.0.0.1:9092 dead for group streams-group 
INFO [kafka-coordinator-heartbeat-thread | streams-group] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator 127.0.0.1:9092 for group streams-group. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_3 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Committing consumer offsets of task 0_1 
ERROR [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Failed while executing StreamTask 0_1 due to commit consumer offsets: 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Updating suspended tasks to contain active tasks [[0_1, 0_2, 0_3]] 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [[0_1, 0_2, 0_3]] 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [[]] 
ERROR [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group streams-group failed on partition revocation 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group streams-group 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator dead for group streams-group 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator for group streams-group. 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group streams-group 
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Constructed client metadata ... 
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor 
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor 
INFO [StreamThread-1] o.a.k.s.p.i.StreamPartitionAssignor - stream-thread [StreamThread-1] Assigned tasks to clients as {...=[activeTasks: ([0_0, 0_4]) assignedTasks: ([0_0, 0_4]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.2], ...=[activeTasks: ([0_1, 0_2, 0_3]) assignedTasks: ([0_1, 0_2, 0_3]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.30000000000000004]}. 
INFO [StreamThread-1] o.a.k.c.c.i.AbstractCoordinator - Successfully joined group streams-group with generation 17 
INFO [StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [topicname-3, topicname-1, topicname-2] for group streams-group 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] New partitions [[topicname-3, topicname-1, topicname-2]] assigned at the end of consumer rebalance. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_1] Initializing processor nodes of the topology 
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_2] Initializing processor nodes of the topology 
INFO [StreamThread-1] o.a.k.s.p.i.StreamTask - task [0_3] Initializing processor nodes of the topology 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Shutting down 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_1 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing a task 0_3 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_1 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Flushing state stores of task 0_3 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_1 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_2 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Closing the state manager of task 0_3 
INFO [StreamThread-1] o.a.k.c.p.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all active tasks [[0_1, 0_2, 0_3]] 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Removing all standby tasks [[]] 
INFO [StreamThread-1] o.a.k.s.p.i.StreamThread - stream-thread [StreamThread-1] Stream thread shutdown complete 
WARN [StreamThread-1] o.a.k.s.p.i.StreamThread - Unexpected state transition from RUNNING to NOT_RUNNING 

まず、非常に簡単であるとするので、処理が長い時間を取っていたことは非常に考えにくいですアプリは数ヶ月間ログのようなメッセージなしで実行されていました。

ログから判断して、カフカストリームはグループに再結合しましたが、突然例外なくシャットダウンしました。私は別のマシン上で実行されている2つのストリームアプリを持っていて、ブローカーが再起動したときに同時に両方がシャットダウンされました。

この問題はどのようにデバッグできますか?少なくとも例外を投げてはいけませんか? もう1つの問題は、ストリームスレッドがシャットダウンしている間、残りのアプリケーションが正常に動作していたため、自動的に再起動されなかったことです。何とかこれをキャッチしてスレッドを再開できますか?保持ポリシーにより、消費者にとっては非常に望ましくないことがあります。どうすればkafkaストリームアプリをより信頼できるものにできますか?

ありがとうございます!

答えて

1

ログからは言い難いです。たぶんDEBUGログは、より多くの情報を明らかにするでしょう...

Initializing processor nodes of the topologyの間にエラーがあっただけかもしれません。しかし、例外があった場合は、実際にログに記録する必要があります。ライブラリ内のバグでもあります。

アプリケーションの監視について、あなたは複数のオプションがあります。

  • をあなたはStreamThreadどうかを例外バブルかどうかを確認するためにKafkaStreams#setUncaughtExceptionHandler()を登録することができますので、スレッドが
  • を死ぬあなたが見るためにKafkaStreams#setStateListener()を登録することができますアプリがNOT_RUNNING状態になっている場合(btw:NOT_RUNNING状態の0.10.2と0.11.0の1つの既知の問題がtrunkに修正されました。すべてのスレッドが停止している場合、状態はまだRUNNINGです。手動でまだ実行されているスレッドの数を監視する)

Btw:複数の重要なバグ修正が含まれている0.10.2.1にアップグレードすることをお勧めします。

関連する問題