2017-09-20 5 views
0

私はSpring Kafkaを初めて使用しています。ここで述べたように私のコンシューマーコードでAcknowledgement.acknowledge()メソッドを使用することはできません。私の春のブートアプリケーションです。手動コミットプロセスを使用していない場合、コードが正常に動作しています。しかし私が Acknowledgement.acknowledge()を使用すると、手動コミットのためにBeanに関連するエラーが表示されます。また、私が手動コミットを正しく使用していない場合は、それを行う正しい方法を教えてください。手動コミットのためのSpring KafkaのAcknowledgement.acknowledge()メソッドの使用方法

エラーメッセージ:

*************************** 
APPLICATION FAILED TO START 
*************************** 

Description: 

Field ack in Receiver required a bean of type 'org.springframework.kafka.support.Acknowledgment' that could not be found. 


Action: 

Consider defining a bean of type 'org.springframework.kafka.support.Acknowledgment' in your configuration. 

私は@Componentを追加する必要がありますが、それは私の消費者のコードであり、すでにあることが判明し、このエラーをGoogleで検索。

マイコンシューマコードは次のようになります。Receiver.java

import java.util.concurrent.CountDownLatch; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.kafka.annotation.KafkaListener; 
import org.springframework.kafka.support.Acknowledgment; 
import org.springframework.stereotype.Component; 

@Component 
public class Receiver { 

    @Autowired 
    public Acknowledgment ack; 

    private CountDownLatch latch = new CountDownLatch(1); 

    @KafkaListener(topics = "${kafka.topic.TestTopic}") 
    public void receive(ConsumerRecord<?, ?> consumerRecord){ 
      System.out.println(consumerRecord.value()); 
      latch.countDown(); 
      ack.acknowledge(); 
    } 
} 

を私のプロデューサーのコードは次のようになります。Sender.java

import java.util.Map; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.kafka.core.KafkaTemplate; 
import org.springframework.stereotype.Component; 

@Component 
public class Sender { 

    @Autowired 
    private KafkaTemplate<String, Map<String, Object>> kafkaTemplate; 

    public void send(Map<String, Object> map){ 
      kafkaTemplate.send("TestTopic", map); 

    } 

} 

EDIT 1:

私の新しい消費者コードは次のようになります。Receiver.java

import java.util.concurrent.CountDownLatch; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.springframework.kafka.annotation.KafkaListener; 
import org.springframework.kafka.support.Acknowledgment; 
import org.springframework.stereotype.Component; 

@Component 
public class Receiver { 

    private CountDownLatch latch = new CountDownLatch(1); 

    @KafkaListener(topics = "${kafka.topic.TestTopic}", containerFactory = "kafkaManualAckListenerContainerFactory") 
    public void receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack){ 
      System.out.println(consumerRecord.value()); 
      latch.countDown(); 
      ack.acknowledge(); 
    } 
} 

私も自分の設定クラスを変更:

import java.util.HashMap; 
import java.util.Map; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.kafka.annotation.EnableKafka; 
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 
import org.springframework.kafka.core.ConsumerFactory; 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 

@Configuration 
@EnableKafka 
public class ReceiverConfig { 

    @Value("${kafka.bootstrap-servers}") 
    private String bootstrapServers; 

    @Value("${spring.kafka.consumer.group-id}") 
    private String consumerGroupId; 

    @Bean 
    public Map<String, Object> consumerConfigs() throws SendGridException { 
      Map<String, Object> props = new HashMap<>(); 
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
      props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); 
      return props; 

    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory(){ 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    /*@Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(){ 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 

     return factory; 
    }*/ 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory(){ 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     return factory; 
    } 

    @Bean 
    public Receiver receiver() { 
     return new Receiver(); 
    } 
} 

私は以下のエラーを取得しています、私の受信()メソッドにcontainerFactory =「kafkaManualAckListenerContainerFactory」を追加した後。

*************************** 
APPLICATION FAILED TO START 
*************************** 

Description: 

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found. 
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found bean 'consumerFactory' 


Action: 

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration. 

答えて

4

あなたは本当にdocumentationに従ってください:

マニュアルAckModeを使用する場合は、リスナーもAcknowledgmentを設けることができます。この例では、別のコンテナファクトリを使用する方法も示しています。本当にどこにもありません

@KafkaListener(id = "baz", topics = "myTopic", 
      containerFactory = "kafkaManualAckListenerContainerFactory") 
public void listen(String data, Acknowledgment ack) { 
    ... 
    ack.acknowledge(); 
} 

AcknowledgmentがBeanであることに注意してください。したがって、 @KafkaListenerメソッドのシグネチャを適切に変更して、@Autowiredという疑わしいメッセージを削除してください。このオブジェクトは受信したメッセージの一部(ヘッダー)なので、存在しません。

+0

こんにちは@Artem、私は変更を行いましたが、別のBeanエラーが発生しました。新しい編集で質問を更新します。春のカフカの文書はそれほど明確ではありません。 – kumarhimanshu449

+0

1.あなたの 'receive()'メソッドに 'Acknowledgement' argがまだありません。だから、私はその 'ack' varが何であるか分かりません。 2.あなたは生のSpring Kafka設定とSpring Bootの組み合わせをいくつか持っています。そこから開始することを検討する必要があります:https://docs.spring.io/spring-boot/docs/1.5.7.RELEASE/reference/htmlsingle/#boot-features-kafkaそしてすぐに再構成してください適切な 'ConsumerFactory' beanを介して' kafkaListenerContainerFactory'ビーンを生成します。あなたのケースでは、それは 'ConsumerFactory 'ですが、 'ConsumerFactory ' –

関連する問題