Iカフカコンシューマーファクトリーをスプリングカフカの文書に従ってセットアップします。 しかし、groupIdは使用されていないようです。たぶん私も全部が間違っているので、私はあなたが私が何を経験したかを伝えたいと思っていました。カフカ消費者がカスタムグループに参加しません。
これは動作するようには思えない私の設定です:
@Bean
ConsumerFactory<String, KafkaEvent> kafkaEventConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
getConsumerProperties(),
new StringDeserializer(),
new JsonDeserializer<>(KafkaEvent.class));
}
Map<String, Object> getConsumerProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // TODO
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000);
return props;
}
そして、私はこのように構成された@KafkaEventListener
を持って、明示的に再びのgroupIdを指定せずに:
@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC)
public class KafkaEventListener {
@Autowired
private ConsumerFactory<String, KafkaEvent> consumerFactory;
@KafkaHandler
public void listenTo(@Payload KafkaEvent event) {
LOGGER.error(LogMarker.KAFKA, consumerFactory.getConfigurationProperties().toString());
}
}
を私も見ることができます私のgroupId "myGroupId"は上記のLog Logのエラーに含まれています。しかし私が疑わしいのは、いつも別のグループIDに参加すると言っているコンシューマーコーディネーターのDEBUGロギングです。これは正しいと心配しています。
2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.906 ( ) INFO consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [] for group org.springframework.kafka.KafkaListenerEndpointContainer#0
2017-09-04 15:28:13.907 ( ) INFO consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [my-topic-0] for group org.springframework.kafka.KafkaListenerEndpointContainer#0
また、Spring起動時にConsumerConfigが出力されます。 groupIdが間違っていることがわかりますが、他の属性は正しく引き継がれています。
私が理解する限り、ConsumerFactoryでそれを設定するか、spring.kafka.consumer.group-id
を使用してapplication.propertiesに設定することで、groupIdをグローバルに設定できます。どちらの変種もうまくいきません。私たちは、春のブート2.0.0を使用している
@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC, groupId = "myGroupId")
:この設定では
2017-09-04 15:38:30.787 ( ) DEBUG consumer.internals.AbstractCoordinator - Received successful JoinGroup response for group myGroupId: [email protected]
:私はのgroupIdが@KafkaListener
注釈を使用して設定するとき
のみLOGは、消費者が正しいグループに加わったと述べています.M3(したがって、Spring Kafka 2.0.0.M3)