2017-04-20 3 views
1

私はコンシューマーAPI(v。0.10.0.0)でKafkaを使用しています。カフカはまた、私はこの簡単なテストを実行しているhttp://wurstmeister.github.io/kafka-docker/Kafkaコンシューマーが参加するクラスタに加入しました

から画像を使用してドッカーで実行されている:

@Test 
    public void test2() { 

    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("group.id", RandomStringUtils.randomAlphabetic(8)); 
    props.put("auto.offset.reset.config", "earliest"); 
    props.put("enable.auto.commit", "false"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
    Properties props1 = new Properties(); 
    props1.put("bootstrap.servers", "localhost:9092"); 

    props1.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props1.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

    KafkaProducer<String, String> producer1 = new KafkaProducer<>(props1); 
    KafkaProducer<String, String> producer = producer1; 

    consumer.subscribe(asList(TEST_TOPIC)); 

    producer.send(new ProducerRecord<>(TEST_TOPIC, 0, "key", "value message")); 
    producer.flush(); 


    boolean done = false; 
    while (!done) { 
     ConsumerRecords<String, String> msg = consumer.poll(1000); 
     if (msg.count() > 0) { 
     Iterator<ConsumerRecord<String, String>> msgIt = msg.iterator(); 
     while (msgIt.hasNext()) { 
      ConsumerRecord<String, String> rec = msgIt.next(); 
      System.out.println(rec.value()); 
     } 
     consumer.commitSync(); 
     done = true; 
     } 
    } 

    consumer.close(); 
    producer.close(); 
    } 

トピック名と消費者のIDがランダムに各実行時に生成されます。

行動は時々それが.pollを呼び出すときに、時にはそれが(ループを開始し、動作します...非常に不安定である)下記の繰り返し出力を持つ:

2017-04-20 12:01:46 DEBUG NetworkClient:476 - Completed connection to node 1003 
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 3 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106738, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2bea5ab4, request=RequestSend(header={api_key=10,api_version=0,correlation_id=3,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106738, sendTimeMs=1492686106738), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 4 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106840, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]3d8314f0, request=RequestSend(header={api_key=10,api_version=0,correlation_id=5,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106839, sendTimeMs=1492686106839), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:46 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:46 DEBUG Metadata:180 - Updated cluster metadata version 5 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:46 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686106941, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2df32bf7, request=RequestSend(header={api_key=10,api_version=0,correlation_id=7,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686106940, sendTimeMs=1492686106940), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 6 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107042, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]530612ba, request=RequestSend(header={api_key=10,api_version=0,correlation_id=9,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107041, sendTimeMs=1492686107041), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 
2017-04-20 12:01:47 DEBUG Metadata:180 - Updated cluster metadata version 7 to Cluster(nodes = [192.168.100.80:9092 (id: 1003 rack: null)], partitions = [Partition(topic = ByjSIH, partition = 0, leader = 1003, replicas = [1003,], isr = [1003,]]) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:476 - Sending coordinator request for group RHAdpuiv to broker 192.168.100.80:9092 (id: 1003 rack: null) 
2017-04-20 12:01:47 DEBUG AbstractCoordinator:489 - Received group coordinator response ClientResponse(receivedTimeMs=1492686107144, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer[email protected]2a40cd94, request=RequestSend(header={api_key=10,api_version=0,correlation_id=11,client_id=consumer-1}, body={group_id=RHAdpuiv}), createdTimeMs=1492686107144, sendTimeMs=1492686107144), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) 
2017-04-20 12:01:47 DEBUG NetworkClient:640 - Sending metadata request {topics=[ByjSIH]} to node 1003 

誰もが何が起こっているのか知っていますか?それは私にかなり簡単なセットアップ/テストのようです...

答えて

0

私は自分自身の理由を見つけました。だから私は1つのパーティションだけのトピックで消費者を動かしていた。その後、私は消費者とのプロセスを殺していたので、クリーンなシャットダウンはありませんでした。

このような状況では、セッションが終了するまで、ブローカはコンシューマのための場所を保持します。別の消費者と結合しようとすると、そのエラーが期限まで発生します。 1が行うことができます解決するために

: - 変更のグループID - セッションの有効期限 まで待って - ブローカーを再起動し

より多くの知識を持つ人がより良い説明することができた場合は、

を行ってください(?)
関連する問題