2017-07-09 9 views
0

私はKafkaストリームを使用しており、Javaから最初のコンシューマオフセットをリセットしたいと考えています。 KafkaConsumer.seekToBeginning(...)は行うには正しいことのように聞こえるが、私はカフカストリームで作業:Kafkaストリームから最初にコンシューマオフセットをリセットします。

KafkaStreams streams = new KafkaStreams(builder, props); 
... 
streams.start(); 

私はこれを定義し、具体的なストリームパイプラインに応じて、ボンネットの下にいくつかの消費者を作るだろうと思います。それらにアクセスできますか?あるいは、プログラムでオフセットをリセットする他の方法がありますか?

答えて

1

カフカ・ストリームを使用しているため、コンシューマ・オフセットだけでなく、Streamsの内部状態ストアもリセットする必要があります。

幸運なことに、Kafkaに付属のStreamsアプリケーションリセットツールがあります。

ハンスJespersensの答えにhttps://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool

+1

これは私がすでに知っていることです。上記のキーワードは、プログラムによって記述されています。 – micgn

+0

リセットツールはプログラムであり、オープンソースなので、回答がはいの場合、コンシューマオフセットとStreams状態ストアの両方をプログラムでリセットすることができます。 –

0

ビルを参照してください、私は成功したスクリプトは、Javaコードで何を行うには、このコードを使用:

import kafka.tools.StreamsResetter; 

StreamsResetter resetter = new StreamsResetter(); 
String[] args = {"--application-id", APP_ID, "--bootstrap-servers", KAFKA_SERVERS, "--input-topics", TEST_TOPIC_NAME, "--zookeeper", ZOOKEEPER}; 
resetter.run(args); 

クラスは、私は輸入カフカのコアライブラリの一部であります

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.12</artifactId> 
     <version>${kafka.version}</version> 
    </dependency> 
関連する問題