2017-09-06 11 views
0

私は現在、メッセージを消費するためにkafkaとspring-kafkaを使用しようとしています。Spring-kafkaとkafka 0.10

しかし、私はトラブル同じトピックのためのいくつかの消費者の実行を持っており、いくつかの質問があります。

1 - 私の消費者はいくつかの時間後に切断してトラブルの再接続を持っている傾向がある

定期的に上げてWARN以下私の消費者の場合:

2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.poc.crawler.kafka.KafkaListener  : Consuming {"some-stuff": "yes"} from topic [job15] 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Start of crawling 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Url has already been treated ==> skipping 
2017-09-06 15:32:35.054 WARN 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {job15-3=OffsetAndMetadata{offset=11547, metadata=''}, job15-2=OffsetAndMetadata{offset=15550, metadata=''}} failed for group group-3: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [job15-3, job15-2] for group group-3 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] s.k.l.ConcurrentMessageListenerContainer : partitions revoked:[job15-3, job15-2] 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group group-3 

これは、消費者が数秒間停止して待機する原因となります。

メッセージに記載されているとおり、消費者をsession.timeout.msに変更しました。30000のようになりました。私はまだメッセージを受け取ります。 提供されたログでわかるように、レコードの処理が終了した直後に切断が発生します。 ... 30代前半には多くのことがあります。私は、彼らが同じメッセージを処理する傾向があることを見た私の消費者のログを見ながら

2 - 2つのコンシューマ・アプリケーションは、本当に多くの場合、

同じメッセージを受け取ります。私はカフカがat-least-onceだと理解しましたが、私は重複がたくさんあるとは思っていませんでした。 うまくいけば、私はredisを使用していますが、おそらく、私がしなければならないいくつかのチューニング/プロパティーを誤解しています。

CODE

注:私はauto-commit=trueConcurrentMessageListenerContainerを使用してますが1つのスレッドで実行しています。消費者はスレッドセーフではないサービスを使用するため、同じアプリケーションのいくつかのインスタンスを開始するだけです。

KafkaContext.java

@Slf4j 
@Configuration 
@EnableConfigurationProperties(value = KafkaConfig.class) 
class KafkaContext { 

    @Bean(destroyMethod = "stop") 
    public ConcurrentMessageListenerContainer kafkaInListener(IKafkaListener listener, KafkaConfig config) { 
     final ContainerProperties containerProperties = 
       new ContainerProperties(config.getIn().getTopic()); 
     containerProperties.setMessageListener(listener); 
     final DefaultKafkaConsumerFactory<Integer, String> defaultKafkaConsumerFactory = 
       new DefaultKafkaConsumerFactory<>(consumerConfigs(config)); 

     final ConcurrentMessageListenerContainer messageListenerContainer = 
       new ConcurrentMessageListenerContainer<>(defaultKafkaConsumerFactory, containerProperties); 

     messageListenerContainer.setConcurrency(config.getConcurrency()); 
     messageListenerContainer.setAutoStartup(false); 
     return messageListenerContainer; 
    } 

    private Map<String, Object> consumerConfigs(KafkaConfig config) { 
     final String kafkaHost = config.getHost() + ":" + config.getPort(); 
     log.info("Crawler_Worker connecting to kafka at {} with consumerGroup {}", kafkaHost, config.getIn().getGroupId()); 
     final Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getIn().getGroupId()); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonNextSerializer.class); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
     props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000); 
     return props; 
    } 

} 

リスナー

@Slf4j 
@Component 
class KafkaListener implements IKafkaListener { 

    private final ICrawlingService crawlingService; 

    @Autowired 
    public KafkaListener(ICrawlingService crawlingService) { 
     this.crawlingService = crawlingService; 
    } 

    @Override 
    public void onMessage(ConsumerRecord<Integer, Next> consumerRecord) { 
     log.info("Consuming {} from topic [{}]", JSONObject.wrap(consumerRecord.value()), consumerRecord.topic()); 

     consumerService.apply(consumerRecord.value()); 
    } 
} 

答えて

0

ここでの主な問題は、あなたの消費者グループは、継続的にリバランスされていることです。 session.timeout.msを増やしても構いませんが、私の設定ではこの設定が適用されていません。削除してください:

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000); 

と設定:

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); 
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); 

あなたはブローカーとの通信に、より良いパフォーマンスを得るためにMAX_POLL_RECORDS_CONFIGを増やすことができます。しかし、1つのスレッドでのみメッセージを処理する場合は、この値を低く保つ方が安全です。

+0

こんにちは、時々誰かがあなたのコードを見なければなりません...間違ったパラメータを選んだことを指摘してくれてありがとう。私は今日これを試し、この問題を更新しておきます! – ogdabou

関連する問題