我々は時々のうち1後に限り20〜30分でハング新しいKafkaConsumerの「ポーリング」メソッドの呼び出し問題を抱えています3人のカフカブローカーが再起動しました!の読み込みオフセットとメタデータブロックブローカーの再起動後にKafkaConsumer
3ブローカーkafkaセットアップ(0.9.0.1)を使用しています。 コンシューマプロセスは新しいJava KafkaConsumer-APIを使用し、特定のTopicPartitionsに割り当てるのは です。私はここに実際のコードを表示することはできませんが、基本的に私たちのコードは次のように動作し様々な理由のため
:
を次のようにProperties consumerProps=loadConsumerProperties();
// bootstrap.servers=<IP1>:9092,<IP2>:9092,<IP3>:9092
// group.id="consumer_group_gwbc2
// enable.auto.commit=false
// auto.offset.reset=latest
// session.timeout.ms=30000
// key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
// value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.assign(Arrays.asList(new TopicPartition("someTopic",0)));
while (true) {
// THIS CALL sometimes blocks for a very long Time after a broker restart
ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(200);
Iterator<ConsumerRecord<String, byte[]>> recordIter = records.iterator();
while (recordIter.hasNext()) {
ConsumerRecord<String, byte[]> record = recordIter.next();
// Very fast, actually just sending a UDP Paket via Netty.
processRecord(record);
if (lastCommitHappendFiveOrMoreSecondsAgo()) {
kafkaConsumer.commitAsync();
}
}
}
kafka-topics.shは__consumer_offsetsトピックについて説明し
Topic:__consumer_offsets PartitionCount:50
ReplicationFactor:3 Configs:segment.bytes=104857600,
cleanup.policy=compact,compression.type=uncompressed
再起動したブローカのserver.logは、__consumer_offsetsトピックの特定のパーティションからのオフセットのロードに時間がかかることを示しています(この場合、約22 Minuテス)。これは、消費者の「投票」コールがブロックされている時間と相関します。
[2016-07-25 16:02:40,846] INFO [Group Metadata Manager on Broker 1]: Loading offsets and group metadata from [__consumer_offsets,15] (kafka.coordinator.GroupMetadataManager)
[2016-07-25 16:25:36,697] INFO [Group Metadata Manager on Broker 1]: Finished loading offsets from [__consumer_offsets,15] in 1375851 milliseconds.
ローディングプロセスが非常に遅く、何ができるのでしょうか?