私はKafkaとの統合にSpring Cloud Streamを使用しようとしています。書き込まれているメッセージはJava POJOであり、期待通りに動作しています(メッセージはトピックに書き込まれていて、コンシューマアプリケーションで読むことができます)。メッセージの先頭にいくつかの不明な文字が追加されていますトピックからのメッセージをシンクするためにKafka Connectを統合しようとするとトラブルが発生する。このメッセージがカフカに押されているデフォルトの設定では Kafka製作者JSONシリアル化
: contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}
私は、メッセージが主人公/ヘッダーなしトピックに書かれたJavaアプリ内でカフカのプロデューサーを設定する場合:
カフカの@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<String, Object>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
メッセージ:私はので
{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}
キー/値シリアライザを設定するだけで、コードを使って行うのではなく、application.yml
プロパティファイル内でこれを行うことができるはずです。しかし 、YMLは、(上記の)Javaで構成されたプロデューサーと同じメッセージ発生していないのIE、私は期待通りに働いていないシリアライザを指定するには、更新されたとき:カフカの
spring:
profiles: local
cloud:
stream:
bindings:
session:
destination: session
contentType: application/json
kafka:
binder:
brokers: localhost
zkNodes: localhost
defaultZkPort: 2181
defaultBrokerPort: 9092
bindings:
session:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
メッセージ:
を"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"
これはアプリケーションymlでのみ設定できますか?追加設定がありませんか?
ありがとうございます。しかし、私は以前、運がない人の両方を試してみました。 'headeMode = raw'を使用すると、バイトストリームのように見えます。 'useNativeEncoding = true'を使うと、' application/octet-stream'というコンテンツタイプのメッセージが出てきますが、ヘッダにはまだヘッダが入っています。 – Will