2016-11-09 1 views
1

私はKafkaConsumerの実装をjavaに持っていますが、現在は.pollメソッドを終了することはありません。デバッグモードでソースコードにドリルダウンすると、コーディネーターが見つからないため、whileループで滞っていることがわかりましたAbstractCoordinator.ensureCoordinatorKnown()にあります。KafkaConsumerは終了しません.pollメソッド - GroupCoordinatorNotAvailableException

ループでsendGroupMetadataRequest()から返された未来は、初めてorg.apache.kafka.clients.consumer.internals.SendFailedExceptionで失敗し、以後毎回org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.で失敗します。誰がなぜこれが起こるのか知っていますか?

コンソールプロデューサ/コンシューマを使用しても、正常にメッセージを送受信できるようになるのは、私がKafkaConsumerの実装を使用する場合のみです。さらに、コンシューマーは2つのサーバーで作業するため、コンシューマーの実装ではありません。

は、ここに私の消費者が使用して作成されたプロパティです。

Properties props = new Properties(); 
props.put("bootstrap.servers", "myserver:9000); 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
props.put("group.id", groupId); 
props.put("enable.auto.commit", "true"); 
props.put("auto.commit.interval.ms", "1000"); 
props.put("session.timeout.ms", "30000"); 

編集:

トピックは間違いなく消費者の開始前に作成されます。

編集2: クラスタ内のすべてのブローカーを削除して再作成しましたが、別の時点で失敗しています。 AbstractCoordinator.ensureActiveGroup()に再参加しようとしているうちに、performGroupJoin()から返された未来は、org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.で繰り返し失敗します。まだ何が起こっているのか分かりません。

編集3: 私はブローカーを削除し、異なるIDでそれらを再作成し、今.poll()方法が戻っていると、それが成功したメッセージを消費しています。私はまだそれが最初に失敗した理由を知りたいのですが、私はそれが再び起こらないことを確かめることができます。

+0

カフカのどのバージョンですか?ブローカーと消費者のための同じバージョン?カフカは本当にmyserver:9000(9092はデフォルト)を聞いていますか?コンシューママシンからTelnet経由でKafkaに接続できますか? –

+0

私はKafka 0.9.1を使用しています。はいkafkaは本当にそこを聞いて、私はJavaの1つとして作成したコンソールの消費者のために同じブローカーを使用します。グーグルのtelnetだけで、サーバー上で実行されているかどうかわかりません。私がコンソールの消費者から消費することができることを既に知っているとき、それに接続すると何が証明されますか? – annedroiid

答えて

0

ブローカーを削除して新しいブローカーを作成すると、問題が修正されました。しかし、確かに間違ってブローカーと一緒に行った。

関連する問題