2016-06-27 26 views
1

私のコードにkafkaserverを埋め込もうとしています。私は以下のサンプルコードを使ってその方法を学びましたが、何らかの理由でプロデューサーが組み込みサーバーにメッセージを送信できません(60秒後にタイムアウトします)。私はカフカ0.8.2.2を使用しています。誰かが私が間違っていることを教えてもらえますか?組み込みカフカにエラーが発生しました

import kafka.api.FetchRequest; 
import kafka.api.FetchRequestBuilder; 
import kafka.javaapi.FetchResponse; 
import kafka.javaapi.TopicMetadata; 
import kafka.javaapi.consumer.SimpleConsumer; 
import kafka.javaapi.message.ByteBufferMessageSet; 
import kafka.message.MessageAndOffset; 
import kafka.producer.ProducerConfig; 
import kafka.server.KafkaConfig; 
import kafka.server.KafkaServer; 
import kafka.utils.Time; 
import kafka.utils.Utils; 
import org.apache.commons.collections.functors.ExceptionPredicate; 
import org.apache.curator.test.TestingServer; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.clients.producer.RecordMetadata; 

import java.io.File; 
import java.nio.ByteBuffer; 
import java.util.Properties; 

public class KafkaLocalBroker { 

public static final String TEST_TOPIC = "test-topic"; 

public KafkaConfig kafkaConfig; 
public KafkaServer kafkaServer; 
public TestingServer zookeeper; 


public KafkaLocalBroker() throws Exception{ 

     zookeeper = new TestingServer(true); 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", zookeeper.getConnectString()); 
     props.put("broker.id", 0); 
     kafkaConfig = new KafkaConfig(props); 

     kafkaServer = new KafkaServer(kafkaConfig, new Time() { 
      public long nanoseconds() { 
       return System.nanoTime(); 
      } 

      public long milliseconds() { 
       return System.currentTimeMillis(); 
      } 

      public void sleep(long ms) { 
       try { 
        Thread.sleep(ms); 
       } catch(InterruptedException e){ 
        // Do Nothing 
       } 
      } 
     }); 
     kafkaServer.startup(); 
     System.out.println("embedded kafka is up"); 
    } 

    public void stop(){ 
     kafkaServer.shutdown(); 
     System.out.println("embedded kafka stop"); 
    } 

    /** 
    * a main that tests the embedded kafka 
    * @param args 
    */ 
    public static void main(String[] args) { 

    KafkaLocalBroker kafkaLocalBroker = null; 
     //init kafka server and start it: 
     try { 
      kafkaLocalBroker = new KafkaLocalBroker(); 
     } catch (Exception e){ 

     } 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("acks", "all"); 
     props.put("retries", 1); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1); 
     props.put("buffer.memory", 33554432); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); 

     //send one message to local kafka server: 
     for (int i=0; i<10; i++){ 
      ProducerRecord<String, String> data = new ProducerRecord<String, String>(TEST_TOPIC, "test-message" + i); 
      producer.send(data, (metadata, exception) -> { 
       if (exception != null) { 

        System.out.println("Failed to write log message: " + exception.getMessage()); 

       } else { 
        System.out.println("Successful write to offset {} in partition {} on topic {}: " + 
          metadata.offset() + ", " + metadata.partition() + ", "+ metadata.topic()); 

       } 
      }); 
     } 

     //consume messages from Kafka: 
     SimpleConsumer consumer = new SimpleConsumer("localhost", 9092, 10000, 1024000, "clientId"); 
     long offset = 0L; 
     while (offset < 160) { //this is an exit criteria just for this test so we are not stuck in enless loop 
      // create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB 
      FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(TEST_TOPIC, 0, offset, 100000).build();//new FetchRequest(TEST_TOPIC, 0, offset, 1000000); 

      // get the message set from the consumer and print them out 
      FetchResponse messages = consumer.fetch(fetchRequest); 
      for(MessageAndOffset msg : messages.messageSet(TEST_TOPIC, 0)) { 

       ByteBuffer payload = msg.message().payload(); 
       byte[] bytes = new byte[payload.limit()]; 
       payload.get(bytes); 
       try { 
        System.out.println(new String(bytes, "UTF-8")); 
       } catch (Exception e){ 

       } 
       // advance the offset after consuming each message 
       offset = msg.offset(); 
      } 
     } 

     producer.close(); 
     //close the consumer 
     consumer.close(); 
     //stop the kafka broker: 
     if(kafkaLocalBroker != null) { 
      kafkaLocalBroker.stop(); 
     } 
    } 
} 

EDIT:

org.apache.kafka.common.errors.TimeoutException:60000ミリ秒後にメタデータの更新に失敗しました、私は以下のプロデューサーから返された例外を含めました。

+1

どのように失敗を評価しますか?特定のエラーメッセージ?問題は、消費者ではなくプロデューサーだと確信していますか? – C4stor

+0

producer.sendは60秒間ハングし、上記の編集に含まれているTimeoutExceptionを吐き出します。 – SChen

+0

あなたはこれを解決しましたか? –

答えて

0

kafkaプロデューサの作成に使用されるプロパティは、0.8では無効です。 producerconfigを実行し、プロパティを変更します。またはkafkaのバージョンを更新してください

+0

'org.apache.kafka.clients.producer.KafkaProducer'または 'org.apache.kafka.producer.Producer'を使用しますか? – SChen

+1

含まれているリンクで指定されたconfigsを使用すると、この例外が発生します:スレッド "main"の例外org.apache.kafka.common.config.ConfigException:デフォルト値がない必須の設定 "bootstrap.servers"がありません。 – SChen

+0

kafkaプロデューサーが少なくともbootstrap.servers、key.serializer、value.serializerを必要とするようです – SChen

関連する問題