2017-11-20 10 views
1

私はKafkaとの統合にSpring Cloud Streamを使用しようとしています。書き込まれているメッセージはJava POJOであり、期待通りに動作しています(メッセージはトピックに書き込まれていて、コンシューマアプリケーションで読むことができます)。メッセージの先頭にいくつかの不明な文字が追加されていますトピックからのメッセージをシンクするためにKafka Connectを統合しようとするとトラブルが発生する。このメッセージがカフカに押されているデフォルトの設定では Kafka製作者JSONシリアル化

 contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}} 

私は、メッセージが主人公/ヘッダーなしトピックに書かれたJavaアプリ内でカフカのプロデューサーを設定する場合:

カフカの
@Configuration 
public class KafkaProducerConfig { 

    @Bean 
    public ProducerFactory<String, Object> producerFactory() { 
     Map<String, Object> configProps = new HashMap<>(); 
     configProps.put(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      "localhost:9092"); 
     configProps.put(
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
      StringSerializer.class); 
     configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class); 

     return new DefaultKafkaProducerFactory<String, Object>(configProps); 
    } 

    @Bean 
    public KafkaTemplate<String, Object> kafkaTemplate() { 
     return new KafkaTemplate<>(producerFactory()); 
    } 
} 

メッセージ:私はので

{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471} 

キー/値シリアライザを設定するだけで、コードを使って行うのではなく、application.ymlプロパティファイル内でこれを行うことができるはずです。しかし 、YMLは、(上記の)Javaで構成されたプロデューサーと同じメッセージ発生していないのIE、私は期待通りに働いていないシリアライザを指定するには、更新されたとき:カフカの

spring: 
    profiles: local 
    cloud: 
    stream: 
     bindings: 
     session: 
      destination: session 
      contentType: application/json 
     kafka: 
     binder: 
      brokers: localhost 
      zkNodes: localhost 
      defaultZkPort: 2181 
      defaultBrokerPort: 9092 
     bindings: 
      session: 
      producer: 
       configuration: 
       value: 
        serializer: org.springframework.kafka.support.serializer.JsonSerializer 
       key: 
        serializer: org.apache.kafka.common.serialization.StringSerializer 

メッセージ:

"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19" 

これはアプリケーションymlでのみ設定できますか?追加設定がありませんか?

答えて

1

プロデューサプロパティ(....session.producer.useNativeEncoding)のheaderModeuseNativeEncodingを参照してください。

headerMode

生に設定すると、ヘッダが出力に埋め込むディセーブル。メッセージヘッダーをネイティブにサポートしていないヘッダー埋め込みが必要なメッセージングミドルウェアに対してのみ有効です。非Spring Cloud Streamアプリケーションのデータを作成する場合に便利です。

デフォルト:embeddedHeaders。

をtrueに設定すると、アウトバウンドメッセージが(例えば、適切なカフカプロデューサ値シリアライザ設定)対応して構成されている必要があり、クライアントライブラリによって直接シリアライズさ

をuseNativeEncoding。この構成が使用されている場合、アウトバウンド・メッセージ・マーシャリングはバインディングのcontentTypeに基づいていません。ネイティブエンコーディングを使用する場合、インバウンドメッセージを逆シリアル化するために適切なデコーダ(例:Kafkaコンシューマ値デシリアライザ)を使用するのはコンシューマの責任です。また、ネイティブのエンコード/デコードが使用されている場合、headerModeプロパティは無視され、ヘッダーはメッセージに埋め込まれません。

デフォルト:false。

+0

ありがとうございます。しかし、私は以前、運がない人の両方を試してみました。 'headeMode = raw'を使用すると、バイトストリームのように見えます。 'useNativeEncoding = true'を使うと、' application/octet-stream'というコンテンツタイプのメッセージが出てきますが、ヘッダにはまだヘッダが入っています。 – Will

0

上記の答えは@Garyです!

完全性のために、現在私のために働いている構成は以下のとおりです。

spring: 
    profiles: local 
    cloud: 
    stream: 
     bindings: 
     session: 
      producer: 
      useNativeEncoding: true 
      destination: session 
      contentType: application/json 
     kafka: 
     binder: 
      brokers: localhost 
      zkNodes: localhost 
      defaultZkPort: 2181 
      defaultBrokerPort: 9092 
     bindings: 
      session: 
      producer: 
       configuration: 
       value: 
        serializer: org.springframework.kafka.support.serializer.JsonSerializer 
       key: 
        serializer: org.apache.kafka.common.serialization.StringSerializer