2017-03-21 10 views
12

私はロケーションイベント(key = user_id、value = user_location)を送信するカフカのトピックを持っています。私はKStreamとして読んで、それを処理することができる午前:うまく動作しますが、私は、各ユーザーの最後の既知の位置にKTableを持っているしたいと思いますKafka Streams API:KStream to KTable

KStreamBuilder builder = new KStreamBuilder(); 

KStream<String, Location> locations = builder 
     .stream("location_topic") 
     .map((k, v) -> { 
      // some processing here, omitted form clarity 
      Location location = new Location(lat, lon); 
      return new KeyValue<>(k, location); 
     }); 

。どうすればいい?

私はへの書き込みと中間トピックから読んでそれを行うことができる午前:

// write to intermediate topic 
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux"); 

// build KTable from intermediate topic 
KTable<String, Location> table = builder.table("location_topic_aux", "store"); 

KStreamからKTableを取得するための簡単な方法はありますか?これはKafka Streamsを使用した初めてのアプリなので、明らかに何かが不足している可能性があります。

答えて

12

現時点ではこれを行うための単純な方法はありません。あなたのアプローチはConfluent FAQで説明されているように完全に有効です。http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

これはコードに関して最も簡単な方法です。ただし、(a)追加のトピックを管理する必要があり、(b)データがKafkaに書き込まれ、再読み込みされるため、追加のネットワークトラフィックが発生するという欠点があります。

一つの代替は「ダミー減らす」を用いて、あり:

KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, Long> stream = ...; // some computation that creates the derived KStream 

KTable<String, Long> table = stream.groupByKey().reduce(
    new Reducer<Long>() { 
     @Override 
     public Long apply(Long aggValue, Long newValue) { 
      return newValue; 
     } 
    }, 
    "dummy-aggregation-store"); 

このアプローチは、オプション1と比べてコードに関して幾分より複雑であるが、その(A利点を有します)手作業でのトピック管理は不要であり、(b)カフカからのデータの再読み込みは不要です。オプション2では

、カフカストリームは、フォールトトレランスのためKTableをバックアップするための内部変更履歴のトピックを作成します。

全体的に、あなたがより良い好き接近する、自分で決定する必要があります。したがって、両方のアプローチでは、Kafkaで追加のストレージが必要になり、ネットワークトラフィックが増加します。全体的に、オプション2のやや複雑なコードとオプション1の手動トピック管理のトレードオフです。

+0

私はあなたのアプローチを 'KStream'から' KStream'を使って構築しようとしています。ダムの 'groupByKey'メソッドは' groupByKey'メソッドを解決することはできません。あなたは何が間違っているかもしれないか考えていますか? (私はJavaエコシステムとkafkasを初めて使用しています) – LetsPlayYahtzee

+1

Streamsのバージョンは何ですか?古いバージョンの場合、 'stream.groupByKey()。reduce(...)'の代わりに 'stream.reduceByKey(...)'でなければなりません。 http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation –

+1

最新のバージョンを使用していると思っていましたが、私は '0.10.0'を使っていましたwhile '0.10.1'バージョンのドキュメントを見ています。だから私はそれを修正:) thnx – LetsPlayYahtzee

関連する問題