0

私のデモアプリケーションでは、メッセージをkafkaキューに戻すためにrestコントローラを作成する必要があります。私は春・カフカのリファレンスガイドを読んで、消費者の設定を実装し、作成した豆をrestコントローラがspring kafka経由でkafkaのレコードを返す

@Configuration 
@EnableKafka 
public class ConsumerConfiguration { 

    @Value("${kafka.bootstrap-servers}") 
    private String bootstrapServers; 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     // list of host:port pairs used for establishing the initial connections to the Kakfa cluster 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     // allows a pool of processes to divide the work of consuming and processing records 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "trx"); 

     return props; 
    } 

    @Bean 
    public ConsumerFactory<String, Transaction> transactionConsumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(
       consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Transaction.class) 
     ); 
    } 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Transaction>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, Transaction> factory = 
       new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(transactionConsumerFactory()); 

     return factory; 
    } 

    @Bean 
    public Consumer consumer() { 
     return new Consumer(); 
    } 

} 

以下のようにして、以下の

public class Consumer { 

    private CountDownLatch latch = new CountDownLatch(1); 

    public CountDownLatch getLatch() { 
     return latch; 
    } 

    @KafkaListener(topics = "${kafka.topic.name}") 
    public void receive(Transaction transaction) { 
     latch.countDown(); 
    } 
} 

のような別のクラスの消費者はどのように私は今からトランザクションを取得するためのロジックを実装することができていますそれぞれの消費者はコントローラでヒットします。

ありがとうございます。

答えて

2

まあ、@KafkaListenerは、カフカからコールバックにレコードをストリームする独立した長寿命プロセスを生成します。あなたはREST GETイベントについて話しているので、ConsumerFactoryからKafkaConsumerを取得し、コントローラーメソッドから手動でpoll()を呼び出さないと選択肢がありません。

+0

返信いただきありがとうございますArtemに感謝します。同じガイドについては、ガイドまたはドキュメントを教えてください。 – maverick

+0

おそらくこれは:http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html? –

+0

ありがとう、私はコードを見ていた、それは春 - カフカを使用しないで、直接カフカAPIを使用しているようだ。私は正しい? – maverick

関連する問題