2017-03-06 4 views
-1

私はSpringクラウドストリームとKafkaには新しいので、kafkaトピックからjsonメッセージを消費する良い例を探しています。一般的なjsonメッセージを消費するためのSpringクラウドストリームkafkaバインダーの良い例はありますか?

おかげ

+0

これはダウンホートされ、オフトピックとしてマークされていますが、この問題を解決しました。私は、このコミュニティが常に厳しいものではないので、重要な議論を終わらせることに焦点を当てています。 stackoverflowのポイントは、プログラマーとこれの答えを助けて、私が5時間ほど解決しようとした問題を解決するのに役立ちました... – Maxincredible52

答えて

4

あなたは3つのシナリオがある実行中のアプリを取得するにはexamples from spring-cloud-stream team(&ソースプロジェクトをシンク)

を参照することができます。

コンシューマとプロデューサがSpring-Cloud-Stream(SCS)アプリの場合は、content-typeapplication/jsonと設定するだけです。

#Specific channel 
spring.cloud.stream.bindings.<channelName>.consumer.contentType=application/json 
#For all channels 
spring.cloud.stream.default.contentType=application/json 

2つ目のシナリオは、あなたのプロデューサーがSCSではありませんし、あなたの消費者がSCS、SCSは、デフォルトでペイロードに埋め込まれたヘッダを追加しているので、あなたが一緒にcontentTyperawとしてheaderModeを入れてその動作を無効にする必要があります。第三のシナリオで

#Specific channel 
spring.cloud.stream.bindings.<channelName>.consumer.headerMode=raw 
#For all channels 
spring.cloud.stream.default.consumer.headerMode=raw 

SCSプロデューサーはなくSCSの消費者である、この場合では、SCSがString(there is a issue for that)のための生のヘッダをサポートしていないため、contentTypeとしてapplication/octet-streamを使用する必要があるので、あなたが送信する必要がありますバイト単位のペイロード

#Properties 
spring.cloud.stream.default.contentType=application/octet-stream 
spring.cloud.stream.default.producer.headerMode=raw 

//Java 
byte[] payload = jacksonObjectMapper.writeValueAsBytes(entity); 
return channel.send(MessageBuilder.withPayload(payload).build()); 
関連する問題