私は現在、メッセージを消費するためにkafkaとspring-kafkaを使用しようとしています。Spring-kafkaとkafka 0.10
しかし、私はトラブル同じトピックのためのいくつかの消費者の実行を持っており、いくつかの質問があります。
1 - 私の消費者はいくつかの時間後に切断してトラブルの再接続を持っている傾向がある
定期的に上げてWARN以下私の消費者の場合:
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.poc.crawler.kafka.KafkaListener : Consuming {"some-stuff": "yes"} from topic [job15]
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Start of crawling
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Url has already been treated ==> skipping
2017-09-06 15:32:35.054 WARN 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {job15-3=OffsetAndMetadata{offset=11547, metadata=''}, job15-2=OffsetAndMetadata{offset=15550, metadata=''}} failed for group group-3: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [job15-3, job15-2] for group group-3
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] s.k.l.ConcurrentMessageListenerContainer : partitions revoked:[job15-3, job15-2]
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group group-3
これは、消費者が数秒間停止して待機する原因となります。
メッセージに記載されているとおり、消費者をsession.timeout.ms
に変更しました。30000
のようになりました。私はまだメッセージを受け取ります。 提供されたログでわかるように、レコードの処理が終了した直後に切断が発生します。 ... 30代前半には多くのことがあります。私は、彼らが同じメッセージを処理する傾向があることを見た私の消費者のログを見ながら
2 - 2つのコンシューマ・アプリケーションは、本当に多くの場合、
同じメッセージを受け取ります。私はカフカがat-least-once
だと理解しましたが、私は重複がたくさんあるとは思っていませんでした。 うまくいけば、私はredisを使用していますが、おそらく、私がしなければならないいくつかのチューニング/プロパティーを誤解しています。
CODE
注:私はauto-commit=true
とConcurrentMessageListenerContainer
を使用してますが1つのスレッドで実行しています。消費者はスレッドセーフではないサービスを使用するため、同じアプリケーションのいくつかのインスタンスを開始するだけです。
KafkaContext.java
@Slf4j
@Configuration
@EnableConfigurationProperties(value = KafkaConfig.class)
class KafkaContext {
@Bean(destroyMethod = "stop")
public ConcurrentMessageListenerContainer kafkaInListener(IKafkaListener listener, KafkaConfig config) {
final ContainerProperties containerProperties =
new ContainerProperties(config.getIn().getTopic());
containerProperties.setMessageListener(listener);
final DefaultKafkaConsumerFactory<Integer, String> defaultKafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(consumerConfigs(config));
final ConcurrentMessageListenerContainer messageListenerContainer =
new ConcurrentMessageListenerContainer<>(defaultKafkaConsumerFactory, containerProperties);
messageListenerContainer.setConcurrency(config.getConcurrency());
messageListenerContainer.setAutoStartup(false);
return messageListenerContainer;
}
private Map<String, Object> consumerConfigs(KafkaConfig config) {
final String kafkaHost = config.getHost() + ":" + config.getPort();
log.info("Crawler_Worker connecting to kafka at {} with consumerGroup {}", kafkaHost, config.getIn().getGroupId());
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getIn().getGroupId());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonNextSerializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000);
return props;
}
}
リスナー
@Slf4j
@Component
class KafkaListener implements IKafkaListener {
private final ICrawlingService crawlingService;
@Autowired
public KafkaListener(ICrawlingService crawlingService) {
this.crawlingService = crawlingService;
}
@Override
public void onMessage(ConsumerRecord<Integer, Next> consumerRecord) {
log.info("Consuming {} from topic [{}]", JSONObject.wrap(consumerRecord.value()), consumerRecord.topic());
consumerService.apply(consumerRecord.value());
}
}
こんにちは、時々誰かがあなたのコードを見なければなりません...間違ったパラメータを選んだことを指摘してくれてありがとう。私は今日これを試し、この問題を更新しておきます! – ogdabou