私はロケーションイベント(key = user_id、value = user_location)を送信するカフカのトピックを持っています。私はKStream
として読んで、それを処理することができる午前:うまく動作しますが、私は、各ユーザーの最後の既知の位置にKTable
を持っているしたいと思いますKafka Streams API:KStream to KTable
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) -> {
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
});
。どうすればいい?
私はへの書き込みと中間トピックから読んでそれを行うことができる午前:
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");
// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
がKStream
からKTable
を取得するための簡単な方法はありますか?これはKafka Streamsを使用した初めてのアプリなので、明らかに何かが不足している可能性があります。
私はあなたのアプローチを 'KStream'から' KStream'を使って構築しようとしています。ダムの 'groupByKey'メソッドは' groupByKey'メソッドを解決することはできません。あなたは何が間違っているかもしれないか考えていますか? (私はJavaエコシステムとkafkasを初めて使用しています) – LetsPlayYahtzee
Streamsのバージョンは何ですか?古いバージョンの場合、 'stream.groupByKey()。reduce(...)'の代わりに 'stream.reduceByKey(...)'でなければなりません。 http://docs.confluent.io/3.1.0/streams/upgrade-guide.html#stream-grouping-and-aggregation –
最新のバージョンを使用していると思っていましたが、私は '0.10.0'を使っていましたwhile '0.10.1'バージョンのドキュメントを見ています。だから私はそれを修正:) thnx – LetsPlayYahtzee