2017-12-21 12 views
2

私はKafkaトピックを作成し、それにメッセージを送りました。コマンドラインでKStreamをコンソールに印刷するには?

ので

bin/kafka-console-consumer --bootstrap-server abc.xyz.com:9092 --topic myTopic --from-beginning --property print.key=true --property key.separator="-" 

プリント

key1-customer1 

私は、このトピックのうち、カフカストリームを作成したいとコンソール上でこのkey1-customer1を印刷したいです。

私はそれのために次のことを書いた:

final Properties streamsConfiguration = new Properties(); 

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id"); 
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id"); 
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "abc.xyz.com:9092"); 

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

// Records should be flushed every 10 seconds. This is less than the default 
// in order to keep this example interactive. 
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); 
// For illustrative purposes we disable record caches 
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); 

final StreamsBuilder builder = new StreamsBuilder(); 
final KStream<String, String> customerStream = builder.stream("myTopic"); 
customerStream.foreach(new ForeachAction<String, String>() { 
    public void apply(String key, String value) { 
     System.out.println(key + ": " + value); 
    } 
}); 

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); 

streams.start(); 

Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 

これは失敗しません。しかし、this answerが示唆するように、これはコンソール上に何も印刷しません。

私はカフカを初めて利用しています。だから、この仕事をするための提案は私には大いに役立つだろう。

答えて

0

私はCLIENT_ID_CONFIGを設定解除して唯一のAPPLICATION_ID_CONFIGを残してしようとするだろう。カフカストリームuses the application ID to set the client ID

私はまた、あなたのカフカストリームアプリケーションが使用しているコンシューマ・グループIDのオフセットを確認します(このコンシューマ・グループIDは、アプリケーションIDに基づいています)。 kafka-consumer-groups.shツールを使用します。ストリーム・アプリケーションが、そのトピックに作成したすべてのレコードよりも先にある可能性があります。自動オフセット・リセットが最新のものに設定されているか、質問から容易に識別できないその他の理由が考えられます。

0

TL; DRPrintedを使用してください。

import org.apache.kafka.streams.kstream.Printed 
val sysout = Printed 
    .toSysOut[String, String] 
    .withLabel("customerStream") 
customerStream.print(sysout) 
関連する問題