2016-07-26 12 views
1

我々は時々のうち1後に限り20〜30分でハング新しいKafkaConsumerの「ポーリング」メソッドの呼び出し問題を抱えています3人のカフカブローカーが再起動しました!の読み込みオフセットとメタデータブロックブローカーの再起動後にKafkaConsumer

3ブローカーkafkaセットアップ(0.9.0.1)を使用しています。 コンシューマプロセスは新しいJava KafkaConsumer-APIを使用し、特定のTopicPartitionsに割り当てるのは です。私はここに実際のコードを表示することはできませんが、基本的に私たちのコードは次のように動作し様々な理由のため

を次のように

Properties consumerProps=loadConsumerProperties(); 
// bootstrap.servers=<IP1>:9092,<IP2>:9092,<IP3>:9092 
// group.id="consumer_group_gwbc2 
// enable.auto.commit=false 
// auto.offset.reset=latest 
// session.timeout.ms=30000 
// key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
// value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer 

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); 
consumer.assign(Arrays.asList(new TopicPartition("someTopic",0))); 

while (true) { 

    // THIS CALL sometimes blocks for a very long Time after a broker restart 
    ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(200); 

    Iterator<ConsumerRecord<String, byte[]>> recordIter = records.iterator(); 
    while (recordIter.hasNext()) {     
    ConsumerRecord<String, byte[]> record = recordIter.next(); 

    // Very fast, actually just sending a UDP Paket via Netty. 
    processRecord(record); 

    if (lastCommitHappendFiveOrMoreSecondsAgo()) { 
     kafkaConsumer.commitAsync(); 
    } 
    } 
} 

kafka-topics.shは__consumer_offsetsトピックについて説明し

Topic:__consumer_offsets PartitionCount:50 
ReplicationFactor:3 Configs:segment.bytes=104857600, 
cleanup.policy=compact,compression.type=uncompressed 

再起動したブローカのserver.logは、__consumer_offsetsトピックの特定のパーティションからのオフセットのロードに時間がかかることを示しています(この場合、約22 Minuテス)。これは、消費者の「投票」コールがブロックされている時間と相関します。

[2016-07-25 16:02:40,846] INFO [Group Metadata Manager on Broker 1]: Loading offsets and group metadata from [__consumer_offsets,15] (kafka.coordinator.GroupMetadataManager) 
[2016-07-25 16:25:36,697] INFO [Group Metadata Manager on Broker 1]: Finished loading offsets from [__consumer_offsets,15] in 1375851 milliseconds. 

ローディングプロセスが非常に遅く、何ができるのでしょうか?

答えて

0

理由を確認しました。私たちのブローカーの

server.xml設定ファイルは (デフォルトではこのプロパティは、バージョン0.9.0.1のよう本当です)プロパティに

log.cleaner.enable=false 

が含まれている、これはそれが内部固め__consumer_offsetsトピックをkafkas意味しますログクリーナーが無効になっているため、 は実際には圧縮されません。実際には で、このトピックの一部のパーティションは、新しいグループコーディネーターがキャッシュを補充する必要があるときに、すべての消費者オフセットデータを読み取るために必要な時間をサーバーギガバイトのサイズに拡大しました。

関連する問題