2016-10-18 13 views
1

私はManningの統一ログ処理の本を読んでいます。最初の練習はJavaの単純なカフカ消費者です。プログラムが実行されると、consumer.poll()が呼び出されます。kafka Javaクライアントは消費しません - consumer.pollでハングアップします

私は本の著者によって提供さvalgrindの環境内部からこれを実行していますし、それがgit clone https://github.com/alexanderdean/Unified-Log-Processing.git

で利用可能であることがあるzookeeper-3.4.6kafka_2.10-0.8.2.1

私は、次のコマンドラインを使用してトピックを作成しました:

./kafka-topics.sh --create --topic raw --zookeeper localhost:2181 --replication-factor 1 --partitions 1 
Created topic "raw". 

kafka-console-producerおよびkafka-console-consumerは期待どおり動作しています。

./kafka-console-producer.sh --topic raw --broker-list localhost:9092 
[2016-10-17 14:09:05,899] WARN Property topic is not valid (kafka.utils.VerifiableProperties) 
one 
two 
three 
four 
five 


./kafka-console-consumer.sh --topic raw --from-beginning --zookeeper localhost:2181 
one 
two 
three 
four 
five 
^CConsumed 5 messages 

私がテストしているJavaコードはかなり基本的なもので、コンシューマを作成するだけです。私が使用しています

StreamApp.java

package nile; 

public class StreamApp {               

    public static void main(String[] args){          
     String servers = args[0];            
     String groupId = args[1]; 
     String inTopic = args[2]; 
     String goodTopic = args[3];            

     Consumer consumer = new Consumer(servers, groupId, inTopic);    
     consumer.run();            
    }                   
} 

Consumer.java

package nile; 

import java.util.*; 

import org.apache.kafka.clients.consumer.*; 

public class Consumer {                                                

    private final KafkaConsumer<String, String> consumer;    // a                                   
    private final String topic;                                              

    public Consumer(String servers, String groupId, String topic) {                                     
     this.consumer = new KafkaConsumer<String, String>(createConfig(servers, groupId));                               
     this.topic = topic;                                               
     System.out.println("Topic to listen for:" + this.topic + ":");                                    
    }                                                    

    public void run() {                                                
     System.out.println("Starting to listen for items ");                                      
     this.consumer.subscribe(Arrays.asList(this.topic));    // b                                  
     try {                                                  
      while (true) {                                               
       System.out.println("Subscribed to: " + consumer.subscription());                                 
       System.out.println("Inside the loop");                                        
       ConsumerRecords<String, String> records = consumer.poll(100); // c                                 
       System.out.println("After consuming");                                        
       for (ConsumerRecord<String, String> record : records) {                                    
        System.out.println("Got an item from kafka: " + record.value());                                
       }                                                 
      }                                                  
     } finally {                                                 
      consumer.close();                                              
     }                                                   
    }                                                    

    private static Properties createConfig(String servers, String groupId) {                                  

     Properties props = new Properties();                                          
     props.put("bootstrap.servers", servers);                                         
     props.put("group.id", groupId);         // e                                  
     props.put("enable.auto.commit", "true");                                         
     props.put("auto.commit.interval.ms", "1000");                                        
     props.put("auto.offset.reset", "earliest");                                         
     props.put("session.timeout.ms", "30000");                                         
     props.put("key.deserializer",                                            
        "org.apache.kafka.common.serialization.StringDeserializer"); // a                                
     props.put("value.deserializer",                                            
        "org.apache.kafka.common.serialization.StringDeserializer"); // a     
     return props;                                                
    }                                                    
} 

ライブラリは、(私のbuild.gradleから)私のようにコードを実行しています

dependencies {          // b 
    compile 'org.apache.kafka:kafka-clients:0.9.0.0' 
    compile 'com.maxmind.geoip:geoip-api:1.2.14' 
    compile 'com.fasterxml.jackson.core:jackson-databind:2.6.3' 
    compile 'org.slf4j:slf4j-api:1.7.5' 
} 

、次のとおりです。

java -jar ./build/libs/nile-0.1.0.jar localhost:9092 ulp-ch03-3.3 raw enriched 

出力は次のとおりです。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 
SLF4J: Defaulting to no-operation (NOP) logger implementation 
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 
Topic to listen for:raw: 
Starting to listen for items 
Subscribed to: [raw] 
Inside the loop 

そして、それが停止したところです。 consumer.poll()は何も返さず、タイムアウトしていません。ここで何が間違っているか分かりません。 2日間私の髪を分けていたら、この作業をするための助けがあれば大いに感謝します。 :)

答えて

1

0.9.xコンシューマAPIを使用して0.8.xサーバからのメッセージを消費しているようですが、0.9.0.0では許可されていないため、以前のバージョンとのブローカ間プロトコルが変更されています。 古いコンシューマ(つまりScalaコンシューマ)を使用するか、kafkaサーバのバージョンを0.9.xにアップグレードしてください。

+0

ありがとうございました!私はkafkaをアップグレードし、今はプログラムが完璧に動作しています。 – Raj

関連する問題