2017-09-04 7 views
0

Iカフカコンシューマーファクトリーをスプリングカフカの文書に従ってセットアップします。 しかし、groupIdは使用されていないようです。たぶん私も全部が間違っているので、私はあなたが私が何を経験したかを伝えたいと思っていました。カフカ消費者がカスタムグループに参加しません。

これは動作するようには思えない私の設定です:

@Bean 
ConsumerFactory<String, KafkaEvent> kafkaEventConsumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(
      getConsumerProperties(), 
      new StringDeserializer(), 
      new JsonDeserializer<>(KafkaEvent.class)); 
} 

Map<String, Object> getConsumerProperties() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // TODO 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 


    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3); 
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000); 

    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 45000); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000); 

    return props; 
} 

そして、私はこのように構成された@KafkaEventListenerを持って、明示的に再びのgroupIdを指定せずに:

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC) 
public class KafkaEventListener { 

    @Autowired 
    private ConsumerFactory<String, KafkaEvent> consumerFactory; 

    @KafkaHandler 
    public void listenTo(@Payload KafkaEvent event) { 
     LOGGER.error(LogMarker.KAFKA, consumerFactory.getConfigurationProperties().toString()); 
    } 

} 

を私も見ることができます私のgroupId "myGroupId"は上記のLog Logのエラーに含まれています。しかし私が疑わしいのは、いつも別のグループIDに参加すると言っているコンシューマーコーディネーターのDEBUGロギングです。これは正しいと心配しています。

2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator    - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40 
2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator    - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40 
2017-09-04 15:28:13.906 ( ) INFO consumer.internals.ConsumerCoordinator    - Setting newly assigned partitions [] for group org.springframework.kafka.KafkaListenerEndpointContainer#0 
2017-09-04 15:28:13.907 ( ) INFO consumer.internals.ConsumerCoordinator    - Setting newly assigned partitions [my-topic-0] for group org.springframework.kafka.KafkaListenerEndpointContainer#0 

また、Spring起動時にConsumerConfigが出力されます。 groupIdが間違っていることがわかりますが、他の属性は正しく引き継がれています。

私が理解する限り、ConsumerFactoryでそれを設定するか、spring.kafka.consumer.group-idを使用してapplication.propertiesに設定することで、groupIdをグローバルに設定できます。どちらの変種もうまくいきません。私たちは、春のブート2.0.0を使用している

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC, groupId = "myGroupId") 

:この設定では

2017-09-04 15:38:30.787 ( ) DEBUG consumer.internals.AbstractCoordinator    - Received successful JoinGroup response for group myGroupId: [email protected] 

:私はのgroupIdが@KafkaListener注釈を使用して設定するとき

のみLOGは、消費者が正しいグループに加わったと述べています.M3(したがって、Spring Kafka 2.0.0.M3)

答えて

1

M3のバグです。 fixed on master(2.0.3.BUILD-SNAPSHOT)(および1.3.0.M2)。今週末に2.0.0.RC1リリース候補をリリースする予定です(Spring Framework RC4を待っています)。

関連する問題