2017-03-08 7 views
0

私はKafkaを初めて使用しました。現在、Kafka Providerからデータを取得するために既存のプログラム(Kafka Consumer)を使用しようとしていました。Kafkaプロデューサーが共有する最新のデータを取得しないコンシューマー

私の関心事は、 私のコンシューマーによるフェッチが終わったら、プロバイダは新しいデータセットを再び共有できます。 私の消費者がデータをフェッチできるようにするにはどうすればよいですか?

下記のコードをご覧ください。

 import kafka.consumer.ConsumerConfig; 
     import kafka.consumer.KafkaStream; 
     import kafka.javaapi.consumer.ConsumerConnector; 

     import java.util.HashMap; 
     import java.util.List; 
     import java.util.Map; 
     import java.util.Properties; 
     import java.util.concurrent.ExecutorService; 
     import java.util.concurrent.Executors; 
     import java.util.concurrent.TimeUnit; 

     public class ConsumerGroupExample { 
      private final ConsumerConnector consumer; 
      private final String topic; 
      private ExecutorService executor; 

      public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { 
       consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
         createConsumerConfig(a_zookeeper, a_groupId)); 
       this.topic = a_topic; 
      } 

      public void shutdown() { 
       if (consumer != null) consumer.shutdown(); 
       if (executor != null) executor.shutdown(); 
       try { 
        if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { 
         System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); 
        } 
       } catch (InterruptedException e) { 
        System.out.println("Interrupted during shutdown, exiting uncleanly"); 
       } 
      } 

      public void run(int a_numThreads) throws InterruptedException { 
       Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
       topicCountMap.put(topic, new Integer(a_numThreads)); 
       Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 

       //List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

       // System.out.println(streams.size()); 
       // now launch all the threads 
       // 
       executor = Executors.newFixedThreadPool(a_numThreads); 
       List<KafkaStream<byte[], byte[]>> streams = null; 
       // now create an object to consume the messages 
       // 
       int threadNumber = 0; 
    boolean keepRunningThread = false; 
    for (;;) { 

        streams = consumerMap.get(topic); 
        for (final KafkaStream stream : streams) { 
         keepRunningThread =true; 
         executor.submit(new ConsumerTest(stream, threadNumber,keepRunningThread)); 
         //threadNumber++; 

        } 
        //TimeUnit.MILLISECONDS.sleep(100); 
        //System.out.println("Going to sleep "); 
       } 

      private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { 
       Properties props = new Properties(); 
       props.put("zookeeper.connect", a_zookeeper); 
       props.put("group.id", a_groupId); 
       props.put("zookeeper.session.timeout.ms", "1600"); 
       props.put("zookeeper.sync.time.ms", "200"); 
       props.put("consumer.timeout.ms","10"); 
       props.put("auto.offset.reset", "smallest"); 
       props.put("auto.commit.interval.ms", "1000"); 
       //props.put("key.deserializer", 
       //  "org.apache.kafka.common.serialization.StringDeserializer"); 
       //  props.put("value.deserializer", 
       //  "org.apache.kafka.common.serialization.StringDeserializer"); 

       return new ConsumerConfig(props); 
      } 

      public static void main(String[] args) throws InterruptedException { 
       String zooKeeper = args[0]; 
       String groupId = args[1]; 
       String topic = args[2]; 
       int threads = Integer.parseInt(args[3]); 

       ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); 
       example.run(threads); 

       try { 
        Thread.sleep(10000); 
       } catch (InterruptedException ie) { 

       } 
       example.shutdown(); 
      } 
     } 






     import kafka.consumer.ConsumerIterator; 
     import kafka.consumer.KafkaStream; 

     public class ConsumerTest implements Runnable { 
      private KafkaStream m_stream; 
      private int m_threadNumber; 
     private boolean keepRunningThread 

      public ConsumerTest(KafkaStream a_stream, int a_threadNumber,boolean keepRunningThread) { 
       m_threadNumber = a_threadNumber; 
       m_stream = a_stream; 
keepRunningThread = keepRunningThread; 
      } 

      public void run() { 
       ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); 
       while(keepRunningThread) 
{ 
    try 
    { 
    if(it.hasNext()) 
    { 
     System.out.println(new String(it.next().message())); 
    } 
    } 
    catch(ConsumerTimeoutException ex) 
    { 
    // Nothing serious timeout exception waiting for kafka message 
    } 
}‍‍‍‍‍‍‍‍‍‍ 

       // System.out.println("Shutting down Thread: " + m_threadNumber); 
      } 
     } 

答えて

0

メッセージがない場合は、読み取りとタイムアウトを処理するようにコードを変更してください。下記のコードは、あなたの消費者を妨げることなくメッセージを読み続けるでしょう。あなたの現在のコードはのために読まれ続けますが(it.hasNext())しかし、それは消費者をブロックします。それはシャットダウン10秒

  // try 
      // { 
      //  Thread.sleep(10000); 
      // } catch (InterruptedException ie) { 

      // } 
      // example.shutdown(); 

後のあなたの消費者はまた、あなたの消費者の構成でconsumer.timeout.msを追加するため)(メインから行を次

削除、他のコードがブロックされます。あなたは(カフカドキュメントからコピーペースト)https://kafka.apache.org/07/documentation.html

を参照してください詳細について consumer.timeout.msを消費者ループ

while(keepRunningThread) 
{ 
    try 
    { 
    if(it.hasNext()) 
    { 
     System.out.println(new String(it.next().message())); 
    } 
    } 
    catch(ConsumerTimeoutException ex) 
    { 
    // Nothing serious timeout exception waiting for kafka message 
    // Wait for 5 seconds 
    Thread.sleep(5000); 
    } 
}‍‍‍‍‍‍‍‍‍‍ 

を終了したいときに制御するkeepRunningThreadフラグを使用します。デフォルトでは、この値は - 消費に利用可能な新しいメッセージがなければ、消費者は無期限にブロックする。値を正の整数に設定すると、指定したタイムアウト値の後にメッセージを使用できない場合にタイムアウト例外がコンシューマにスローされます。

+0

値が-1に設定されている場合、コンシューマは無期限にブロックされます。つまり、プロデューサーが新しいメッセージを送信した場合、消費者は受信できなくなります。 –

+0

また、私は、コードsnnipetで共有されたkeepRunningThreadの使用方法を取得しました –

+0

consumer.timeout.msは消費メッセージを停止しません、それはConsumerTimeoutExceptionをスローする前にit.hasNext()を待つ時間を指定するだけです。これは、(消費者がメインスレッド上にある場合)ハンギングされた消費者の種類を防ぎます。あなたの場合、それは必須ではありませんが、持っていることが良いです。 keepRunningThreadは、コンシューマループを終了するタイミングを制御するフラグです。これを使用して、消費を停止する時間を制御できます(10秒後に消費者シャットダウンを行うのではなく、今行っています)。 これは改善点であり、主な問題ではありません。主な問題は、10秒後にシャットダウンと呼ばれることです – Kaushal

関連する問題