私はカフカ版0.10.2.1
と私のプロジェクトのための春のブートを使用しています。カフカ重複読書
私は別のマシン上で実行されている(同じグループIDを持つ)複数の消費者が消費できる話題の5つのパーティションを持っています。私が直面していますどのような問題
はです:
ログは、この問題は、カフカの消費者に生じたことを示しているとおり、私はこれらのカフカの警告ログを単一のメッセージ
Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
の重複読み取りを取得していますコミットに失敗しました。ここで
私ユースケースに関するいくつかの詳細されています。同じグループID my-consumer-group
に属し
は私が話題
My-Topic
の複数の消費者を持っている消費者は、カフカからのメッセージを消費しますビジネスロジックを適用し、処理されたデータを格納する
Cassandra
カフカからのメッセージを消費するプロセスビジネスロジックを適用してからCassandra に保存すると、Kafkaからのメッセージあたり約10ミリ秒かかります。
私はカフカ消費者Beanを作成するには、次のコードを使用しています
@Configuration
@EnableKafka
public class KafkaConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String brokerURL;
@Value("${spring.kafka.session.timeout}")
private int sessionTimeout;
@Value("${spring.kafka.consumer.my-group-id}")
private String groupId;
@Value("${spring.kafka.listener.concurrency}")
private int concurrency;
@Value("${spring.kafka.listener.poll-timeout}")
private int timeout;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(timeout);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
}
これらは私が私の主な関心事は、の重複読み取りである
spring.kafka.listener.concurrency=2
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.session.timeout=50000
spring.kafka.connection.timeout=10000
spring.kafka.topic.partition=5
spring.kafka.message.replication=2
を使用していますカフカ、設定されています同じ消費者グループに属する複数のカフカ消費者からのメッセージで、私のアプリケーションでは、データベースへの重複エントリーを避けなければなりません。
私は上記のKafka設定とKafka-consumer-codeを参考にして、重複した読み込みを避けることができますか?
問題は、私はKafka-consumerが受け取ったメッセージに基づいてインクリメントされるCassandraにカウンタ列があることです。 重複した読み取りが発生した場合、カウンターが2回以上インクリメントされ、誤った分析が行われます。 –
メッセージングの世界へようこそ。あなたのシナリオでは、「正確に1回」の配信は達成できません(私のことを信じていなければgoogle)。私が言ったように、重複配信を受ける可能性を最小限に抑えることはできますが、排除することはできません。カフカオフセットをコミットする前にmongodbを更新してからサーバがクラッシュした場合を考えてみましょう。結果 - 再配信。それが重要な場合は、最初にmongoをチェックして、すでにこのイベントを保存しているかどうかを確認する必要があります。 –
ありがとうございます@Gary。あなたの答えは本当に私を助けました。正確に1回 "配達が不可能ならば、銀行とミッションクリティカルなシステムの仕組みはわかっていますが、RDBMSを使用していますが、どのメッセージングツールを使用しているのですか? –