2017-08-02 11 views
1

私はカフカ版0.10.2.1私のプロジェクトのための春のブートを使用しています。カフカ重複読書

私は別のマシン上で実行されている(同じグループIDを持つ)複数の消費者が消費できる話題の5つのパーティションを持っています。私が直面していますどのような問題

です:

ログは、この問題は、カフカの消費者に生じたことを示しているとおり、私はこれらのカフカの警告ログを単一のメッセージ

Auto offset commit failed for group my-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

の重複読み取りを取得していますコミットに失敗しました。ここで

ユースケースに関するいくつかの詳細されています。同じグループID my-consumer-groupに属し

  • は私が話題My-Topicの複数の消費者を持っている

  • 消費者は、カフカからのメッセージを消費しますビジネスロジックを適用し、処理されたデータを格納するCassandra

  • カフカからのメッセージを消費するプロセスビジネスロジックを適用してからCassandra に保存すると、Kafkaからのメッセージあたり約10ミリ秒かかります。

私はカフカ消費者Beanを作成するには、次のコードを使用しています

@Configuration 
@EnableKafka 
public class KafkaConsumer { 
    @Value("${spring.kafka.bootstrap-servers}") 
    private String brokerURL; 

    @Value("${spring.kafka.session.timeout}") 
    private int sessionTimeout; 

    @Value("${spring.kafka.consumer.my-group-id}") 
    private String groupId; 

    @Value("${spring.kafka.listener.concurrency}") 
    private int concurrency; 

    @Value("${spring.kafka.listener.poll-timeout}") 
    private int timeout; 

    @Value("${spring.kafka.consumer.enable-auto-commit}") 
    private boolean autoCommit; 

    @Value("${spring.kafka.consumer.auto-commit-interval}") 
    private String autoCommitInterval; 

    @Value("${spring.kafka.consumer.auto-offset-reset}") 
    private String autoOffsetReset; 

    @Bean 
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setConcurrency(concurrency); 
     factory.getContainerProperties().setPollTimeout(timeout); 
     return factory; 
    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
    } 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> propsMap = new HashMap<>(); 
     propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerURL); 
     propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); 
     propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); 
     propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); 
     propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 
     propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); 
     return propsMap; 
    } 
} 

これらは私が私の主な関心事は、の重複読み取りである

spring.kafka.listener.concurrency=2 
spring.kafka.listener.poll-timeout=3000 
spring.kafka.consumer.auto-commit-interval=1000 
spring.kafka.consumer.enable-auto-commit=true 
spring.kafka.consumer.auto-offset-reset=earliest 
spring.kafka.session.timeout=50000 
spring.kafka.connection.timeout=10000 
spring.kafka.topic.partition=5 
spring.kafka.message.replication=2 

を使用していますカフカ、設定されています同じ消費者グループに属する複数のカフカ消費者からのメッセージで、私のアプリケーションでは、データベースへの重複エントリーを避けなければなりません。

私は上記のKafka設定とKafka-consumer-codeを参考にして、重複した読み込みを避けることができますか?

答えて

0

簡単な答えはautoCommitを使用しないでください - スケジュール通りにコミットします。

代わりに、コンテナにコミットを実行させます。 AckModeRECORDを使用します。

ただし、あなたのコードは偶発的にする必要があります。常に再配信の可能性があります。より信頼性の高いコミット戦略により確率が小さくなることだけです。

+0

問題は、私はKafka-consumerが受け取ったメッセージに基づいてインクリメントされるCassandraにカウンタ列があることです。 重複した読み取りが発生した場合、カウンターが2回以上インクリメントされ、誤った分析が行われます。 –

+0

メッセージングの世界へようこそ。あなたのシナリオでは、「正確に1回」の配信は達成できません(私のことを信じていなければgoogle)。私が言ったように、重複配信を受ける可能性を最小限に抑えることはできますが、排除することはできません。カフカオフセットをコミットする前にmongodbを更新してからサーバがクラッシュした場合を考えてみましょう。結果 - 再配信。それが重要な場合は、最初にmongoをチェックして、すでにこのイベントを保存しているかどうかを確認する必要があります。 –

+0

ありがとうございます@Gary。あなたの答えは本当に私を助けました。正確に1回 "配達が不可能ならば、銀行とミッションクリティカルなシステムの仕組みはわかっていますが、RDBMSを使用していますが、どのメッセージングツールを使用しているのですか? –

関連する問題