JSONオブジェクトを複数のトピックに書き込むために1つのプロデューサを使用したいと思います。複数のトピックのSpring Kafka単一プロデューサ
次のコードは、私がしたいことをしていますが、KafkaTemplate
にメッセージを送信する必要があることを伝えるのに、setDefaultTopic()
メソッドを使用するのは間違っています。
よりsend(String topic, ? payload)
メソッドを使用すると機能しません。
マイプロデューサー:
public class MyProducer {
@Autowired
private KafkaTemplate<String, ?> kafka;
public void send(String topic, Message<?> message) {
kafka.setDefaultTopic(topic);
kafka.send(message);
}
}
そして、私の設定:適切にこれを行う方法について
@Configuration
public class MyProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
...
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());
return kafkaTemplate;
}
}
任意の提案ですか?
UPDATE
私はこれにコードを変更...
プロデューサー:私のコントローラで
public void send(Message<?> message) {
kafka.send(message);
}
(私はメッセージを作成するオブジェクト)。
MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName"));
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers);
producer.send(genericMessage);
MessageHeadersオブジェクトにはまだIDとタイムスタンプが含まれます。
String topic = headers.get(KafkaHeaders.TOPIC, String.class);
もし...
ProducerRecord<?, ?> producerRecord = kafka.getMessageConverter().fromMessage(message, topic);
kafka.send(producerRecord);
それは働いています。ありがとうございました!私は 'KafkaHeaders'クラスについて知らなかったし、' MessageHeaders'クラスはトピックの静的フィールドを提供しません。 –