2017-11-05 5 views
0

私はカフカを使い始めました。私は消費者に小さな問題に直面しています。私はJavaで消費者を書いた。カフカJavaコンシューマーは既に閉店

この例外が発生します - IllegalStateExceptionこのコンシューマは既に閉じられています。

私は次の行に例外を取得:

ConsumerRecords<String,String> consumerRecords = consumer.poll(1000); 

私の消費者は、いくつかの例外を除いてクラッシュした後にこれが起こって始めたと私は再びそれを実行しているしようとしたとき、それは私に、この例外を与えました。ここで

は完全なコードです:

package StreamApplicationsTest; 

import org.apache.kafka.clients.consumer.*; 
import org.apache.kafka.common.serialization.StringDeserializer; 

import java.util.*; 

public class StreamAppConsumer { 

public static void main(String[] args){ 
    int i = 0; 
    //List<String> topics = new ArrayList<>(); 
    List<String> topics = Collections.singletonList("test_topic"); 
    //topics.add("test_topic"); 
    Properties consumerConfigurations = new Properties(); 
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); 
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); 
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG,"TestId"); 

    Consumer<String,String> consumer = new KafkaConsumer<>(consumerConfigurations); 
    consumer.subscribe(topics); 

    while(true){ 
     ConsumerRecords<String,String> consumerRecords = consumer.poll(1000); 
     Iterator<ConsumerRecord<String,String>> iterator = consumerRecords.iterator(); 
     while(iterator.hasNext()){ 
      i++; 
      ConsumerRecord<String,String> consumerRecord = iterator.next(); 
      String key = consumerRecord.key(); 
      String value = consumerRecord.value(); 
      if(key=="exit" || value=="exit") 
       break; 
      System.out.println("Key="+key+"\tValue="+value); 
     } 

     System.out.println("Messages processed = "+Integer.toString(i)); 
     consumer.close(); 

    } 
} 
} 

私は助けの任意の並べ替えが有用であろう、この問題で立ち往生しています。

答えて

1

であることを確認してください動作しているようです無限ループなので、消費者が2度目にポーリングすると閉じます。すぐに問題を処理するために、私はwhile(true)ループをtry-catchでラップし、消費者をcatchまたはfinallyブロックで処理します。

ただし、Kafkaのコンシューマで異なるシャットダウン信号が慎重に処理されないと、データが失われる危険性があります。 Confluentの例を見て、優雅な消費者のシャットダウンhereを見てみることをお勧めします。それは内部実行できるよりも一つだけですので、あなたの場合は、メインスレッドで実行しているので、それは次のようになりたいで...

public static void main(String[] args) { 
    int i = 0; 
    //List<String> topics = new ArrayList<>(); 
    List<String> topics = Collections.singletonList("test_topic"); 
    //topics.add("test_topic"); 
    Properties consumerConfigurations = new Properties(); 
    consumerConfigurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    consumerConfigurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
    consumerConfigurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
    consumerConfigurations.put(ConsumerConfig.GROUP_ID_CONFIG, "TestId"); 

    Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigurations); 
    consumer.subscribe(topics); 

    Runtime.getRuntime().addShutdownHook(new Thread() 
    { 
     public void run() { 
     consumer.wakeup(); 
     } 
    }); 

    try { 
     while (true) { 
     ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); 
     Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); 
     while (iterator.hasNext()) { 
      i++; 
      ConsumerRecord<String, String> consumerRecord = iterator.next(); 
      String key = consumerRecord.key(); 
      String value = consumerRecord.value(); 
      if (key == "exit" || value == "exit") 
      break; 
      System.out.println("Key=" + key + "\tValue=" + value); 
     } 
     System.out.println("Messages processed = " + Integer.toString(i)); 
     } 
    } catch (WakeupExection e) { 
     // Do Nothing 
    } finally { 
     consumer.close(); 
    } 
    } 
} 

は基本的にconsumer.wakeup()を実行すると、消費者の唯一のスレッドセーフ方法ですJavaのシャットダウンフックの起床時に消費者が眠っていないので、起きている覚醒時の逸脱をトリップして、消費者を正常に閉鎖する。

0

これは

public static void main(String[] args) { 

     List<String> topics = new ArrayList<>(); 
     topics.add("test.topic"); 

     final Properties props = new Properties(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP_TO_KAFKA_SERVER"); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 

     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
     consumer.subscribe(topics); 

     System.out.println("Polling"); 
     ConsumerRecords<String, String> consumerRecords = consumer.poll(5000); 

     try { 
      for (ConsumerRecord<String, String> record : consumerRecords) { 
       System.out.println(record.offset() + ": " + record.value()); 
      } 
     } finally { 
      consumer.close(); 
     } 
    } 

はあなたの最後に消費者を閉じているので、あなたのサーバまたはローカルカフカはこれが起こっている

到達可能な出力

--- exec-maven-plugin:1.2.1:exec (default-cli) @ MVN --- 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 
SLF4J: Defaulting to no-operation (NOP) logger implementation 
Polling 
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 
6: test 
7: tes 
関連する問題