2017-08-17 11 views
1

私は、SleuthでトレースするSpring Cloud Stream(SCS)カフカ製作アプリを持っています。私は春・メッセージングを公開しようとしている「GenericMessage」(すなわちMessageHeaders +ペイロード)必要の両方によって消費される: (自動的にMessageHeadersに追加されたトレースを継続すべきである)headerMode = rawのJSON MessageHeaders

  • SCSの消費者
  • MessageHeadersをスキップしてペイロードを処理するSCS/Javaコンシューマーはいません。それが公表される前に、MessageHeaderで トレースエントリはEmbeddedHeaderUtilsすることにより、メッセージの先頭に付加されていますheaderMode = embeddedHeaders(デフォルト)で

。私の非Javaの消費者EmbeddedHeaderUtilsは、純粋なJSON、生headerMode =付きすなわち

?\n invalid-json-headers { payload }

にシリアライズしないとしてこれを処理することはできません。 MessageHeadersが全く送られていない、唯一のペイロードをシリアライズします。私は実際にはそれが作成されたとしてスルースが追加トレース+スパンのID、すなわち含め、全体GenericMessageを公開したい { payload }

すなわち :

{"headers": {"id": "x", "trace": "y", "span": "z"}, "payload": { ... }}

は、これを実現する方法はありますSCSコンシューマ向けに1つのトピックにパブリッシュするだけでなく、ペイロードだけで別のトピックにパブリッシュすること以外にも、

答えて

1

ので、アウトバウンドメッセージの​​としてそのメッセージを公開:

return MessageBuilder.withPayload(message).build(); 

あなたのメッセージは​​としてシリアル化され、あなたが適切な逆シリアル化後に必要なヘッダにアクセスしに​​をキャストすることができますこの方法。

+0

Kafka 0.11はネイティブにヘッダーをサポートします。私たちはそのバージョンをサポートするバインダーを構築中ですので、このような問題はなくなります。 –

+0

ありがとうございます - 私は今これで解決しました。 GenericMessageがペイロードに含まれているので完全ではないので、トレースインターセプタは明らかにその魔法を働かせず、自動的にトレース/スパン情報をヘッダに追加します。 –

+0

しかし、これでMappingJackson2MessageConverter.convertFromInternalによってスローされた 'com.fasterxml.jackson.databind.JsonMappingException:java.lang.StringのインスタンスをSTART_OBJECTトークンからデシリアライズできません。 'という例外がスローされます。 –

関連する問題