現在、あるカフカクラスタ上のトピックから別のカフカクラスタへのメッセージを簡単にストリーミングしようとしています(リモート→ローカルクラスタ)。
Kafka-Streamsをすぐに使用して、ローカルクラスタ上の実際のメッセージを複製する必要はなく、Kafka-Streams処理の「結果」をKafka-Topicsにのみ取得することです。1つのカフカクラスタから別のカフカクラスタへのメッセージのストリーミング
WordCountのデモは、私のものよりも別のPCの1つのカフカインスタンスにあるとしましょう。私はローカルマシン上でカフカインスタンスを実行しています。
今、WordCountのデモを、単語を数えるべき文を含むトピック(「リモート」)上で実行させたいと考えています。
しかし、カウントは、「リモート」トピックではなく、ローカルシステムのトピックに書き込む必要があります。
Kafka-Streams APIでこれが可能ですか?
など。
val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic",
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)
val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()
は非常に
をありがとう - ティム
非常にありがとう、マティアス、それを知ってよかった! –
メッセージを同期的に取り出す最良の方法は何ですか? 各メッセージで '.get()'メソッドを呼び出すだけで、ちょっとハッキリしているようです。 そのプロパティはありますか? –
'get()'を使うのは正しいです。 –