2016-04-13 15 views
1

私はカフカ、プレイ、スカラーを使用しています。 これは私のコードで、私はkafkaサーバーにメッセージを送信したい、トピック名は "テストトピック"です。私はトピック で私送信されたメッセージを見ていないのに、私はすべてのエラーを取得していない午前 ここカフカトピックにメッセージを送信できません

import kafka.producer.ProducerConfig 
    import java.util.Properties 
    import kafka.producer.Producer 
    import scala.util.Random 
    import kafka.producer.Producer 
    import kafka.producer.Producer 
    import kafka.producer.Producer 
    import kafka.producer.KeyedMessage 
    import java.util.Date 

    object KafkaProducerLocal extends App { 

     sendMessage 

     def sendMessage { 

     val topicName = "test-topic" 
     try { 
      val rnd = new Random() 
      val props = new Properties() 
      props.put("metadata.broker.list", "localhost:9092") //kafka 
      props.put("zk.connect", "localhost:2181"); //zookeeper 
      props.put("serializer.class", "kafka.serializer.StringEncoder") 
      props.put("producer.type", "async") 


      val config = new ProducerConfig(props) 
      val producer = new Producer[String, String](config) 
      val t = System.currentTimeMillis() 
      for (nEvents <- Range(0, 10)) { 
      val ip = "192.168.2." + rnd.nextInt(255); 
      val data = new KeyedMessage[String, String](topicName, ip, "Swapnil Test Data" + nEvents); 
      producer.send(data); 
      } 

      producer.close(); 
     } catch { 
      case t: Throwable => t.printStackTrace() 
     } 
     } 

    } 

答えて

0

何も悪いことは、あなたのコードと間違って何もありませんがあります。

  • それはあなたのクライアントと同じバージョンであるあなたはカフカのどのバージョンを実行しているログ
  • を表示するためにあなたのlog4jプロパティを確認します。
  • サーバが動作してトピックが作成され、コンソールの生産者と消費者example

アプリケーションログ

2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Verifying properties 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Property metadata.broker.list is overridden to localhost:9092 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Property producer.type is overridden to async 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Property serializer.class is overridden to kafka.serializer.StringEncoder 
2016-04-19 01:12:34 WARN kafka.utils.Logging$class:83 - Property zk.connect is not valid 
SLF4J: Class path contains multiple SLF4J bindings. 
SLF4J: Found binding in [jar:file:/home/vishnu/.m2/repository/org/slf4j/slf4j-log4j12/1.7.12/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: Found binding in [jar:file:/home/vishnu/.m2/repository/ch/qos/logback/logback-classic/1.1.2/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Shutting down producer 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Begin shutting down ProducerSendThread 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(topic-test) 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Connected to localhost:9092 for producing 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Disconnecting from localhost:9092 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Connected to HMECL001076:9092 for producing 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Shutdown ProducerSendThread complete 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Closing all sync producers 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Disconnecting from HMECL001076:9092 
2016-04-19 01:12:34 INFO kafka.utils.Logging$class:68 - Producer shutdown completed in 298 ms 
でメッセージを送受信することができるかどうされている場合まずトピック link
  • チェックを作成します。

    コンソールコンシューマ出力

    /opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic-test --property group.id cs1 --from-beginning 
    Swapnil Test Data3 
    Swapnil Test Data9 
    Swapnil Test Data2 
    Swapnil Test Data5 
    Swapnil Test Data6 
    Swapnil Test Data8 
    Swapnil Test Data0 
    Swapnil Test Data1 
    Swapnil Test Data4 
    Swapnil Test Data7 
    
  • +0

    作成したトピックを作成しようとしました...ありがとうございました! –

    +0

    props.put( "auto.create.topics.enable"、 "true")が追加されました –

    関連する問題