私はKafkaにメッセージを送るためにFluentD(最後の安定版バージョン12)を使用しています。しかし、FluentDは古いKafkaProducerを使用しているので、レコードのタイムスタンプは常に-1に設定されます。 したがって、WallclockTimestampExtractorを使用して、メッセージがカフカに到着した時点のレコードのタイムスタンプを設定する必要があります。Kafka Streams:レコードのタイムスタンプ(0.11.0)を変更する方法は?
私が本当に興味のタイムスタンプは、メッセージ内fluentdでお送りさ:
"タイムスタンプ": "1507885936"、 "ホスト": "V.X.Y.Z"カフカで
レコード表現:
オフセット= 0、タイムスタンプ= - 1、キー= NULL、値= { "タイムスタンプ": "1507885936"、 "ホスト": "VXYZ"}
iはカフカに、このようなレコードがしたい:
= 0のオフセットを、タイムスタンプ= 1507885936、キー= NULL、値= { "タイムスタンプ": "1507885936"、 "ホスト": "VXYZ"}
私の問題を回避するには、次のようになります。 - タイムスタンプセット(ProducerRecord(文字列のトピック、整数のパーティションに新しいレコードを生成するプロデューサーを書く
- (https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)のタイムスタンプを抽出するために、消費者を書きます、長いタイムスタンプ、Kキー、V値)
私はKafkaStreamsソリューションがあればそれを好むでしょう。
質問にお答えできません。あなたは何を達成しようとしますか? –
ありがとうございます、@ MatthiasJ.Sax! 質問を編集し、私の要求がより明確になることを願って – sunjazz