2017-05-12 9 views
3

トピックパーティション内でメッセージの並べ替えを行い、順序付けされたメッセージを新しいトピックに送信しようとしています。Apache Kafkaは、その値に基づいてウィンドウメッセージを並べ替えます。

私は、次の形式の文字列のメッセージ送信カフカの出版社があります。例えば {system_timestamp}-{event_name}?{parameters}

を:

1494002667893-client.message?chatName=1c&messageBody=hello 
1494002656558-chat.started?chatName=1c&chatPatricipants=3 

また、私たちは対応にそれらを送信するために、各メッセージのいくつかのメッセージキーを追加しますパーティション。私が何をしたいか

{システム・タイムスタンプ}に基づいてイベントを並べ替えるメッセージの一部と1分間のウィンドウ内で、サイト運営者の原因となるのは、メッセージが{に従って送信されることを保証するものではありませんシステムタイムスタンプ}値。

例えば、より大きい{システムタイムスタンプ}値のメッセージをトピックに配信することができます。

私はカフカストリームAPIを調査し、メッセージウィンドウと集計に関するいくつかの例を見つけた:

Properties streamsConfiguration = new Properties(); 
     streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter"); 
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); 
     streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
     streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, String> stream = builder.stream("events"); 
KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion. 

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing. 
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
       () -> "", // initial value 
       (aggKey, value, aggregate) -> aggregate + "", // aggregating value 
       TimeWindows.of(1000), // intervals in milliseconds 
       Serdes.String(), // serde for aggregated value 
       "test-store" 
     );*/ 

しかし、私は、このグループ化されたストリームに次に何をすべきか?私は見ていない 'ソートを()(E1は、E2) - > e1.compareTo(E2)' メソッドが利用でき、また、窓は集約のようなメソッド()に適用することができ、は減らす()count()でも、私はメッセージのデータ操作は必要ありません。

1分間のウィンドウでメッセージを並べ替えて別のトピックに送信するにはどうすればよいですか?

答えて

1

ここ概要は次のとおり

は、そのプロセッサの実装を作成:プロセスにおける

  • ()各メッセージのための方法であって、

    • メッセージ値
    • からタイムスタンプを読み出します
    • は、キーとして(timestamp、message-key)ペアと値としてmessage-valueを使用してKeyValueStoreに挿入します。注意:これはまた重複排除を提供します。カスタムSerdeを用意して、キーをシリアル化してタイムスタンプが最初に来るようにする必要があります。これにより、最初にタイムスタンプで遠隔クエリを並べ替えるようになります。断続()メソッドで
    • 範囲タイムスタンプに0からフェッチ使用してストア読み出し - 60'000(= 1分)
    • が使用順序でフェッチメッセージを送信コンテキスト。新しいMSGのは、「ストリームの時間」を進めるために到着しない場合には、前方()と店舗

このアプローチの問題からそれらを削除トリガーされない断続()です。これがあなたの場合のリスクである場合は、トピックの各パーティション(!)に定期的な「ティック」メッセージを送信する外部スケジューラを作成することができます。プロセッサは無視する必要がありますが、の "本当の" msgs。 KIP-138は、システム・タイム・句読点を明示的にサポートを追加することにより、この制限に対処します: プロデューサーが同じミリ秒内に同じキーを持つ複数のメッセージを発したときに、いくつかのより多くの、これはうまく動作しませんそれについて考えたhttps://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics

+0

を。だから、キーは代わりに(タイムスタンプ、いくつかのユニークキー)ペアまたは値がコレクションでなければなりません。 –

関連する問題