2017-12-15 9 views
3

現在、あるカフカクラスタ上のトピックから別のカフカクラスタへのメッセージを簡単にストリーミングしようとしています(リモート→ローカルクラスタ)。
Kafka-Streamsをすぐに使用して、ローカルクラスタ上の実際のメッセージを複製する必要はなく、Kafka-Streams処理の「結果」をKafka-Topicsにのみ取得することです。1つのカフカクラスタから別のカフカクラスタへのメッセージのストリーミング

WordCountのデモは、私のものよりも別のPCの1つのカフカインスタンスにあるとしましょう。私はローカルマシン上でカフカインスタンスを実行しています。
今、WordCountのデモを、単語を数えるべき文を含むトピック(「リモート」)上で実行させたいと考えています。
しかし、カウントは、「リモート」トピックではなく、ローカルシステムのトピックに書き込む必要があります。

Kafka-Streams APIでこれが可能ですか?
など。

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig) 
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig) 
val wordCounts: KTable[String, Long] = textLines 
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) 
    .groupBy((_, word) => word) 
    .count("word-counts") 

wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig) 

val streams: KafkaStreams = new KafkaStreams(builder) 
streams.start() 

は非常に
をありがとう - ティム

答えて

4

カフカのストリームのみの単一のクラスタ用にビルドです。

回避策は、foreach()または類似のものを使用し、ターゲットクラスタに書き込む自分のKafkaProducerをインスタンス化することです。注意してください、あなた自身のプロデューサを使用する必要があります同期書き込み!それ以外の場合は、障害発生時にデータが失われる可能性があります。したがって、これはあまり効果的な解決策ではありません。

結果をソースクラスタに書き込み、データをターゲットクラスタにレプリケートする方がよい場合があります。とにかく、実際のデータはターゲットクラスタに長い保持時間で保存されるので、ソースクラスタ内の出力トピックの保存期間を大幅に短縮できる可能性が高いことに注意してください。これにより、ソースクラスタに必要なストレージを制限することができます。

+0

非常にありがとう、マティアス、それを知ってよかった! –

+0

メッセージを同期的に取り出す最良の方法は何ですか? 各メッセージで '.get()'メソッドを呼び出すだけで、ちょっとハッキリしているようです。 そのプロパティはありますか? –

+1

'get()'を使うのは正しいです。 –

3

をチェックしてください。これは、マティアスが上記の答えで言及しています。これはあなたの記述がきれいに当てはまります。

+0

パーフェクト!どうもありがとうございました –

関連する問題