私はそうのように、カフカストリームDSLを使用して、当社のカフカクラスタに接続カフカストリームアプリを持っている:KafkaStreams - InconsistentGroupProtocolException
KStreamBuilder builder = new KStreamBuilder();
KStream<String, byte[]> stream = builder.stream(myTopic);
// do work
kStreams = new KafkaStreams(builder, config);
kStreams.start();
そして使用して、当社のクラスタへの接続を確立し、私のコードベースの別の部分消費者のクライアントに直接。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
consumer.subscribe(Collections.singletonList(sourceTopic));
consumer.poll(500L);
// etc
consumer.close();
私はこれをやっている理由は、条件付きで(カフカストリームトポロジを含ん)アプリの他の部分をキックオフ前に、コンシューマ・グループに関するメタデータを収集することです。これを行う方法は他にもあります(さまざまなフックなど)がありますが、これらのメソッドを混在させると時々(断続的に)InconsistentGroupProtocolException
がスローされます。
誰かがなぜこれがスローされているのかを明かすことができますか?私は、ソースコード自体から何が起こっているのか判断するのは難しいですが、Kafka Streamsによって構築された基礎となるコンシューマは、KafkaConsumer
クライアントとは異なるパーティション化プロトコルを指定していると思います。とにかく、この例外を理解する上での助けには大いに感謝します
あなたは何を達成しようとしますか? –
https://issues.apache.org/jira/browse/KAFKA-4113 03/Jan/17のコメントをご覧ください。私はこの問題を経験しており、これが最も簡単な解決策だと思った – foxygen
私は見る。コンシューマーまたはアプリケーションのどちらかがアクティブであることを確認できればうまくいくと思います。したがって、いずれかを開始する前に、コンシューマ・グループにメンバーがないことを確認してください。グループがアクティブではありません。 C.f。 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/StreamsResetter.java#L93(この呼び出しはパブリックAPIの一部ではなく、予告なく変更される場合があります)。 –