2016-06-13 5 views
0

消費者がcomitAsyncをログの最初の2つのオフセットにすることができない奇妙な問題に直面しました。 ..私は引用誰か.Can消費者によって受信され、succesfulyコミットプロデューサーの同じ非同期送信で他のメッセージがこの問題の原因を見つけるので、それは非常に奇妙である私の次のコードと出力例commitAsyncが最初の2つのオフセットをコミットできないのはなぜですか?

package com.panos.example; 

import kafka.utils.ShutdownableThread; 
import org.apache.kafka.clients.consumer.*; 

import org.apache.kafka.common.TopicPartition; 

import java.util.Collections; 
import java.util.Map; 
import java.util.Properties; 

public class Consumer extends ShutdownableThread { 
    private final KafkaConsumer<Integer, String> consumer; 
    private final String topic; 

    public Consumer(String topic) { 
     super("KafkaConsumerExample", false); 
     Properties props = new Properties(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.75:9092"); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
     props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); 
     props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 

     consumer = new KafkaConsumer<Integer, String>(props); 
     this.topic = topic; 
    } 

    @Override 
    public void doWork() { 

     consumer.subscribe(Collections.singletonList(this.topic)); 
     try { 
      ConsumerRecords<Integer, String> records = consumer.poll(1000); 
      long startTime = System.currentTimeMillis(); 
      if (!records.isEmpty()) { 
       System.out.println("C : {} Total No. of records received : {}" + records.count()); 

       for (ConsumerRecord<Integer, String> record : records) { 
        System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); 
        consumer.commitAsync(new ConsumerCallBack(startTime,record.value(), record.offset())); 
       } 

      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public String name() { 
     return null; 
    } 

    @Override 
    public boolean isInterruptible() { 
     return false; 
    } 


    class ConsumerCallBack implements OffsetCommitCallback { 

     private final long startTime; 
     private String message; 
     private final String NewLine = System.getProperty("line.separator"); 
     private long offset; 

     public ConsumerCallBack(long startTime) { 
      this.startTime = startTime; 
     } 

     public ConsumerCallBack(long startTime, String message, long offset) { 
      this.startTime = startTime; 
      this.message=message; 
      this.offset = offset; 
     } 

     public void onComplete(Map<TopicPartition, OffsetAndMetadata> CurrentOffset, 
           Exception exception) { 
      long elapsedTime = System.currentTimeMillis() - startTime; 
      if (exception != null) { 
       System.out.println("Message : {" + message + "}, committed successfully at offset " + offset + 
         CurrentOffset + "elapsed time :" + elapsedTime); 
      } else { 
       System.out.println(exception.toString()); 
       /* JOptionPane.showMessageDialog(new Frame(), 
         "Something Goes Wrong with the Server Please Try again Later.", 
         "Inane error", 
         JOptionPane.ERROR_MESSAGE);*/ 
      } 
     } 
    } 
} 

あなたはすべてのメッセージが正常にコミットされたことを見ることができます。なぜこれが起こるのですか?

Received message: (1, Message_1) at offset 160 
Received message: (2, Message_2) at offset 161 
Received message: (3, Message_3) at offset 162 
Received message: (4, Message_4) at offset 163 
Message : {Message_3}, committed successfully at offset 162{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6 
Message : {Message_4}, committed successfully at offset 163{test-0=OffsetAndMetadata{offset=164, metadata=''}}elapsed time :6 

答えて

2

あなたがcommitAsyncを使用している場合、それは、複数のコミットを単一のコミットメッセージの中に一緒に押しつぶされていることを発生することがあります。オフセットが増加する順序でコミットされるので、オフセットXのコミットは、Xより小さいすべてのオフセットに対して暗黙的なコミットです。あなたのケースでは、コミットまたは最初の3つのオフセットは、オフセット3 。

関連する問題