2016-11-08 12 views
2

私はカフカを使ってログイベントを処理しています。私は単純なコネクタとストリーム変換のためのKafka ConnectとKafka Streamsの基礎知識を持っています。Kafkaは単一のログイベントラインを結合ログイベントに集約します

今は次の構造を持つログファイルがある:

timestamp event_id event 

ログイベントはEVENT_ID(例えばメールログ)

例により接続されている複数のログ・ラインを有します

1234 1 START 
1235 1 INFO1 
1236 1 INFO2 
1237 1 END 

と一般的に複数のイベントがあります。

Exampは、 le:

1234 1 START 
1234 2 START 
1235 1 INFO1 
1236 1 INFO2 
1236 2 INFO3 
1237 1 END 
1237 2 END 

タイムウィンドウ(STARTとENDの間)は最大5分です。

は、結果として私が欲しい

event_id combined_log 

例のようなトピック:

1 START,INFO1,INFO2,END 
2 START,INFO2,END 

これを達成するための適切なツールは何ですか?私はカフカストリームでそれを解決しようとしましたが、どういうことか分かります。

答えて

2

あなたは本質的にメッセージペイロードに基づいてセッションやトランザクションを再構築しています。現時点では、このような機能のための組み込みの、すぐに使用できるサポートはありません。ただし、KafkaのStreams APIのProcessor API部分を使用して、この機能を自分で実装できます。ステート・ストアを使用して、指定されたキーについて、セッション/トランザクションがSTARTED、追加、および終了するときを追跡するカスタム・プロセッサーを作成できます。

メーリングリストのユーザーの中には、そのIIRCを行っている人がいますが、私があなたに指摘できる既存のコード例は知らないのですが。

注意しなければならないのは、不注意なデータを適切に処理することです。あなたの例では、あなたが適切な順序ですべての入力データを記載されている上:実際には

1234 1 START 
1234 2 START 
1235 1 INFO1 
1236 1 INFO2 
1236 2 INFO3 
1237 1 END 
1237 2 END 

しかし、メッセージ/レコードが(私は例を単純化するために、キー1でメッセージを表示)のようなので、アウト・オブ・オーダー到着します:、START -> INFO1 -> INFO2 -> ENDはなくSTART -> ENDINFO1INFO2 =データの損失をドロップする/無視して)またはSTART -> END -> INFO2 -> INFO1(間違った順序:それが起こるとしても、私はあなたのユースケースには、あなたはまだ、このデータを解釈することを

1234 1 START 
1237 1 END 
1236 1 INFO2 
1235 1 INFO1 

理解しますおそらくあなたの意味的制約にも違反します)。

+0

答えをいただきありがとうございます。私はプロセッサAPIを見ていきます。はい、注文の問題も考慮する必要があります。 – imehl

+1

プロセッサAPIは解決策です - もう一度ありがとう! – imehl

+0

@imehl:おそらく、問題を解決するために実際にやったことについての情報で、上記の質問を更新したいと思っています。 –

関連する問題