トピックパーティション内でメッセージの並べ替えを行い、順序付けされたメッセージを新しいトピックに送信しようとしています。Apache Kafkaは、その値に基づいてウィンドウメッセージを並べ替えます。
私は、次の形式の文字列のメッセージ送信カフカの出版社があります。例えば {system_timestamp}-{event_name}?{parameters}
を:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
また、私たちは対応にそれらを送信するために、各メッセージのいくつかのメッセージキーを追加しますパーティション。私が何をしたいか
が{システム・タイムスタンプ}に基づいてイベントを並べ替えるメッセージの一部と1分間のウィンドウ内で、サイト運営者の原因となるのは、メッセージが{に従って送信されることを保証するものではありませんシステムタイムスタンプ}値。
例えば、より大きい{システムタイムスタンプ}値のメッセージをトピックに配信することができます。
私はカフカストリームAPIを調査し、メッセージウィンドウと集計に関するいくつかの例を見つけた:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream("events");
KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.
/* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
() -> "", // initial value
(aggKey, value, aggregate) -> aggregate + "", // aggregating value
TimeWindows.of(1000), // intervals in milliseconds
Serdes.String(), // serde for aggregated value
"test-store"
);*/
しかし、私は、このグループ化されたストリームに次に何をすべきか?私は見ていない 'ソートを()(E1は、E2) - > e1.compareTo(E2)' メソッドが利用でき、また、窓は集約のようなメソッド()に適用することができ、は減らす()、count()でも、私はメッセージのデータ操作は必要ありません。
1分間のウィンドウでメッセージを並べ替えて別のトピックに送信するにはどうすればよいですか?
を。だから、キーは代わりに(タイムスタンプ、いくつかのユニークキー)ペアまたは値がコレクションでなければなりません。 –