2016-09-08 15 views
0

私は何かを間違っているかどうかわかりませんが、それを動作させることができませんでした。以下は私のコードです:注釈付きの春のブートでkafkaリスナーコンテナはメッセージを消費しません

public class EventsApp { 

    private static final Logger log = LoggerFactory.getLogger(EventsApp.class); 

    @Value("${kafka.topic:test}") 
    private String topic; 

    @Value("${kafka.messageKey:si.key}") 
    private String messageKey; 

    @Value("${kafka.broker.address:localhost:9092}") 
    private String brokerAddress; 

    @Value("${kafka.zookeeper.connect:localhost:2181}") 
    private String zookeeperConnect; 


    /** 
    * Main method, used to run the application. 
    * 
    * @param args the command line arguments 
    * @throws UnknownHostException if the local host name could not be resolved into an address 
    */ 
    public static void main(String[] args) throws UnknownHostException, Exception { 

     ConfigurableApplicationContext context 
      = new SpringApplicationBuilder(EventsApp.class) 
      .web(false) 
      .run(args); 

     MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class); 
     for (int i = 0; i < 10; i++) { 
      System.out.println("sending.."+toKafka.toString()); 
      toKafka.send(new GenericMessage<>("foo" + i)); 
     } 

     Thread.sleep(115000); 

     context.close(); 


     System.exit(0); 
    } 

    @KafkaListener(id = "baz", topics = "test", 
     containerFactory = "kafkaListenerContainerFactory") 
    public void listen(String data, Acknowledgment ack) { 
     System.out.println("----- "+data); 
     ack.acknowledge(); 
    } 

    @Bean 
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> 
    kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
      new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(1); 
     factory.getContainerProperties().setPollTimeout(3000); 
     return factory; 
    } 

    @ServiceActivator(inputChannel = "toKafka") 
    @Bean 
    public MessageHandler handler() throws Exception { 
     KafkaProducerMessageHandler<String, String> handler = 
      new KafkaProducerMessageHandler<>(kafkaTemplate()); 
     handler.setTopicExpression(new LiteralExpression(this.topic)); 
     handler.setMessageKeyExpression(new LiteralExpression(this.messageKey)); 
     return handler; 
    } 

    @Bean 
    public KafkaTemplate<String, String> kafkaTemplate() { 
     return new KafkaTemplate<>(producerFactory()); 
    } 

    @Bean 
    public ProducerFactory<String, String> producerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); 
     props.put(ProducerConfig.RETRIES_CONFIG, 0); 
     props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); 
     props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
     props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     return new DefaultKafkaProducerFactory<>(props); 
    } 



    @Bean 
    public ConsumerFactory<Integer, String> consumerFactory() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress); 
     //props.put("zookeeper.connect", this.zookeeperConnect); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "siTestGroup"); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     return new DefaultKafkaConsumerFactory<>(props); 
    } 


    @Bean 
    public TopicCreator topicCreator() { 
     return new TopicCreator(this.topic, this.zookeeperConnect); 
    } 

    public static class TopicCreator implements SmartLifecycle { 

     private final String topic; 

     private final String zkConnect; 

     private volatile boolean running; 

     public TopicCreator(String topic, String zkConnect) { 
      this.topic = topic; 
      this.zkConnect = zkConnect; 
     } 

     @Override 
     public void start() { 
      ZkUtils zkUtils = new ZkUtils(new ZkClient(this.zkConnect, 6000, 6000, 
       ZKStringSerializer$.MODULE$), null, false); 
      try { 
       AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties()); 
      } 
      catch (TopicExistsException e) { 
       // no-op 
      } 
      this.running = true; 
     } 

     @Override 
     public void stop() { 
     } 

     @Override 
     public boolean isRunning() { 
      return this.running; 
     } 

     @Override 
     public int getPhase() { 
      return Integer.MIN_VALUE; 
     } 

     @Override 
     public boolean isAutoStartup() { 
      return true; 
     } 

     @Override 
     public void stop(Runnable callback) { 
      callback.run(); 
     } 

    } 
    } 

私はメッセージを生成することができます。私は春のブートバージョン1.4.0.RELEASEとspring-integration-kafkaバージョン2.0.1.RELEASEを使用しています。消費者の設定に

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

を追加

答えて

0

してみてください。

デフォルトでは、kafkaはトピックの最後に消費者を開始します。

これでうまくいかない場合は、DEBUGログをオンにして、何が起こっているのか把握してください。

+0

こんにちはGaryさん、問題が見つかりました。 1. @EnableKafkaアノテーションがありません 2. spring-kafka 1.0.3.RELEASEでバグがあるようです。 1.0.2.RELEASEでは正常に動作しますが、1.0.3では次のエラーが発生します: java.lang.NoSuchMethodError:org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver:メソッド()Vが見つかりません \t .springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor $ KafkaHandlerMethodFactoryAdapter.createDefaultMessageHandlerMethodFactory(KafkaListenerAnnotationBeanPostProcessor.java:639) ........................ –

+0

これはバグではありませんspring-kafkaは、Spring Framework 4.3.2(https://jira.spring.io/browse/SPR-14616)の問題です。 1.0.3でSpring Framework 4.3.1を使うか、4.3.3を待つことができます。コメントにスタックトレースを入れないでください。わかりましたが、判読できません。代わりに質問を編集してください。 –

関連する問題