2017-09-06 19 views
2

JSONオブジェクトを複数のトピックに書き込むために1つのプロデューサを使用したいと思います。複数のトピックのSpring Kafka単一プロデューサ

次のコードは、私がしたいことをしていますが、KafkaTemplateにメッセージを送信する必要があることを伝えるのに、setDefaultTopic()メソッドを使用するのは間違っています。

よりsend(String topic, ? payload)メソッドを使用すると機能しません。

マイプロデューサー:

public class MyProducer { 

    @Autowired 
    private KafkaTemplate<String, ?> kafka; 

    public void send(String topic, Message<?> message) { 
     kafka.setDefaultTopic(topic); 
     kafka.send(message); 
    } 
} 

そして、私の設定:適切にこれを行う方法について

@Configuration 
public class MyProducerConfig { 

    @Bean 
    public ProducerFactory<String, String> producerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     ... 

     return new DefaultKafkaProducerFactory<>(props); 
    } 

    @Bean 
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) { 
     KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory); 
     kafkaTemplate.setMessageConverter(new StringJsonMessageConverter()); 

     return kafkaTemplate; 
    } 
} 

任意の提案ですか?

UPDATE

私はこれにコードを変更...

プロデューサー:私のコントローラで

public void send(Message<?> message) { 
    kafka.send(message); 
} 

(私はメッセージを作成するオブジェクト)。

MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName")); 
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers); 
producer.send(genericMessage); 

MessageHeadersオブジェクトにはまだIDとタイムスタンプが含まれます。

String topic = headers.get(KafkaHeaders.TOPIC, String.class); 

もし...

ProducerRecord<?, ?> producerRecord = kafka.getMessageConverter().fromMessage(message, topic); 
kafka.send(producerRecord); 

答えて

2

send(Message<?>)バリアントを使用して、メッセージコンバータは、トピックがメッセージヘッダーであることを期待:

+0

それは働いています。ありがとうございました!私は 'KafkaHeaders'クラスについて知らなかったし、' MessageHeaders'クラスはトピックの静的フィールドを提供しません。 –

2

は手動でテンプレートを呼び出す前にStringJsonMessageConverterを使用する必要がありますトピックを他の方法で判断することができます。カスタムコンバータを作成する必要があります。

デフォルトのトピックを変更すると、スレッドセーフではありません。

+0

これはテンプレートによって内部的に実行されます。私の答えを見てください。 –

関連する問題