2017-01-11 9 views
2

私はそうのように、カフカストリームDSLを使用して、当社のカフカクラスタに接続カフカストリームアプリを持っている:KafkaStreams - InconsistentGroupProtocolException

KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, byte[]> stream = builder.stream(myTopic); 

// do work 

kStreams = new KafkaStreams(builder, config); 
kStreams.start(); 

そして使用して、当社のクラスタへの接続を確立し、私のコードベースの別の部分消費者のクライアントに直接。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer); 
consumer.subscribe(Collections.singletonList(sourceTopic)); 
consumer.poll(500L); 
// etc 
consumer.close(); 

私はこれをやっている理由は、条件付きで(カフカストリームトポロジを含ん)アプリの他の部分をキックオフ前に、コンシューマ・グループに関するメタデータを収集することです。これを行う方法は他にもあります(さまざまなフックなど)がありますが、これらのメソッドを混在させると時々(断続的に)InconsistentGroupProtocolExceptionがスローされます。

誰かがなぜこれがスローされているのかを明かすことができますか?私は、ソースコード自体から何が起こっているのか判断するのは難しいですが、Kafka Streamsによって構築された基礎となるコンシューマは、KafkaConsumerクライアントとは異なるパーティション化プロトコルを指定していると思います。とにかく、この例外を理解する上での助けには大いに感謝します

+0

あなたは何を達成しようとしますか? –

+0

https://issues.apache.org/jira/browse/KAFKA-4113 03/Jan/17のコメントをご覧ください。私はこの問題を経験しており、これが最も簡単な解決策だと思った – foxygen

+0

私は見る。コンシューマーまたはアプリケーションのどちらかがアクティブであることを確認できればうまくいくと思います。したがって、いずれかを開始する前に、コンシューマ・グループにメンバーがないことを確認してください。グループがアクティブではありません。 C.f。 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/StreamsResetter.java#L93(この呼び出しはパブリックAPIの一部ではなく、予告なく変更される場合があります)。 –

答えて

3

あなた自身で答えを入れます。 Kafka Streamsはカスタムパーティション割り当てを使用し、Kafka Streamsクライアントは他のKafka Streamsクライアントでのみ動作します。あなたのKafka Streamsアプリケーションと同じグループIDを持つKafkaConsumerを使用すると、KafkaConsumerをオフにしてKafka Streamsコンシューマグループに参加できなくなります。明らかに、KafkaConsumerはカフカストリームで「再生」できません。

関連する問題