3

私はKafkaにメッセージを送るためにFluentD(最後の安定版バージョン12)を使用しています。しかし、FluentDは古いKafkaProducerを使用しているので、レコードのタイムスタンプは常に-1に設定されます。 したがって、WallclockTimestampExtractorを使用して、メッセージがカフカに到着した時点のレコードのタイムスタンプを設定する必要があります。Kafka Streams:レコードのタイムスタンプ(0.11.0)を変更する方法は?

私が本当に興味のタイムスタンプは、メッセージ内fluentdでお送りさ

"タイムスタンプ": "1507885936"、 "ホスト": "V.X.Y.Z"カフカで

レコード表現:

オフセット= 0、タイムスタンプ= - 1、キー= NULL、値= { "タイムスタンプ": "1507885936"、 "ホスト": "VXYZ"}

iはカフカに、このようなレコードがしたい:

= 0のオフセットを、タイムスタンプ= 1507885936、キー= NULL、値= { "タイムスタンプ": "1507885936"、 "ホスト": "VXYZ"}

私の問題を回避するには、次のようになります。 - タイムスタンプセット(ProducerRecord(文字列のトピック、整数のパーティションに新しいレコードを生成するプロデューサーを書く

私はKafkaStreamsソリューションがあればそれを好むでしょう。

+0

質問にお答えできません。あなたは何を達成しようとしますか? –

+0

ありがとうございます、@ MatthiasJ.Sax! 質問を編集し、私の要求がより明確になることを願って – sunjazz

答えて

5

あなたは次のように非常に単純なカフカストリームアプリケーションを書くことができます。

KStreamBuilder builder = new KStreamBuilder(); 
builder.stream("input-topic").to("output-topic"); 

やレコードからタイムスタンプを抽出し、それを返すカスタムTimestampExtractorでアプリケーションを設定します。

Kafka Streamsは、カフカにレコードを書き込むときに、返されたタイムスタンプを使用します。

注:タイム・スタンプが厳密に順序付けされていない注文データがある場合、結果にはタイム・スタンプの順序も含まれません。 Kafka Streamsは、返されたタイムスタンプを使用してKafkaに書き戻します(つまり、抽出されたものがレコードメタデータのタイムスタンプとして使用されます)。書き込み時には、現在処理されている入力レコードのタイムスタンプが、生成されたすべての出力レコードに使用されます。これは、バージョン1.0では保持されますが、将来のリリースでは変更される可能性があります。

+0

これは私が探していた文章です:「Kafka Streamsは記録をKafkaに書き戻すときに返されるタイムスタンプを使用します。音楽を私の耳に。ありがとうございました! –

+0

ちょうど質問を更新しました: "ノート"は完全には正しくありませんでした... –

+0

更新のおかげでよかったです。私は単に移行を把握しようとしていますが、私は出力タイムスタンプを制御できるかどうかわからないうちに発汗していました(後で他のトピックと結合することができます) –