私はSpringクラウドストリームとKafkaには新しいので、kafkaトピックからjsonメッセージを消費する良い例を探しています。一般的なjsonメッセージを消費するためのSpringクラウドストリームkafkaバインダーの良い例はありますか?
おかげ
私はSpringクラウドストリームとKafkaには新しいので、kafkaトピックからjsonメッセージを消費する良い例を探しています。一般的なjsonメッセージを消費するためのSpringクラウドストリームkafkaバインダーの良い例はありますか?
おかげ
あなたは3つのシナリオがある実行中のアプリを取得するにはexamples from spring-cloud-stream team(&ソースプロジェクトをシンク)
を参照することができます。
コンシューマとプロデューサがSpring-Cloud-Stream(SCS)アプリの場合は、content-type
をapplication/json
と設定するだけです。
#Specific channel
spring.cloud.stream.bindings.<channelName>.consumer.contentType=application/json
#For all channels
spring.cloud.stream.default.contentType=application/json
2つ目のシナリオは、あなたのプロデューサーがSCSではありませんし、あなたの消費者がSCS、SCSは、デフォルトでペイロードに埋め込まれたヘッダを追加しているので、あなたが一緒にcontentType
でraw
としてheaderMode
を入れてその動作を無効にする必要があります。第三のシナリオで
#Specific channel
spring.cloud.stream.bindings.<channelName>.consumer.headerMode=raw
#For all channels
spring.cloud.stream.default.consumer.headerMode=raw
SCSプロデューサーはなくSCSの消費者である、この場合では、SCSがString(there is a issue for that)のための生のヘッダをサポートしていないため、contentType
としてapplication/octet-stream
を使用する必要があるので、あなたが送信する必要がありますバイト単位のペイロード
#Properties
spring.cloud.stream.default.contentType=application/octet-stream
spring.cloud.stream.default.producer.headerMode=raw
//Java
byte[] payload = jacksonObjectMapper.writeValueAsBytes(entity);
return channel.send(MessageBuilder.withPayload(payload).build());
これはダウンホートされ、オフトピックとしてマークされていますが、この問題を解決しました。私は、このコミュニティが常に厳しいものではないので、重要な議論を終わらせることに焦点を当てています。 stackoverflowのポイントは、プログラマーとこれの答えを助けて、私が5時間ほど解決しようとした問題を解決するのに役立ちました... – Maxincredible52