2017-04-12 13 views
0

以下は、プロデューサとコンシューマのクラスです。いつデータを生成することができ、次のコードでそれを消費することができません。一度私をここで助けることができますか?私はコーディングに間違っていましたか?私の目的は、消費者からCustomMessageオブジェクトを読んで、そのデータをDBに格納することです。kafkaのプロデューサーから消費者にオブジェクトを消費する方法は?

私のcmdプロンプトでは、zookeeperのインスタンス1、kafkaの1、プロデューサの1、コンシューマ1のインスタンス1を開いています。私は本当に理解していない。プロデューサクラスとコンシューマクラスを実行するときにすべてのインスタンスを保持する必要がありますか?

すべてのポインタが本当に役に立ちます。

ありがとうございます。

producer class::: 

    package com.kafka.test.demo; 

    import java.io.IOException; 
    import java.util.Properties; 

    import javax.xml.parsers.ParserConfigurationException; 

    import org.apache.kafka.clients.producer.KafkaProducer; 
    import org.apache.kafka.clients.producer.ProducerRecord; 
    import org.xml.sax.SAXException; 

    public class KafkaaProducer { 
     public static void main(String[] args) throws ParserConfigurationException, SAXException, IOException { 
      Properties props = new Properties(); 
//customMessage is a pojo object which should be send to the consumer.. 
      CustomMessage customMessage= new CustomMessage(); 
      customMessage.setMessage("hello kafka"); 
      customMessage.setFan("1234213123"); 
      customMessage.setSourceSystem("Dmap"); 
      customMessage.setStatus("Unenrolled"); 
      customMessage.setMessageTyep("Simple Message"); 
      customMessage.setCreatedTime("5"); 
      customMessage.setProcessedTime("6"); 
      customMessage.setRetryCount("3"); 
      props.put("metadata.broker.list", "localhost:9092"); 
      props.put("serializer.class", "kafka.serializer.StringEncoder"); 
      props.put("request.required.acks", "1"); 
      props.put("bootstrap.servers", "localhost:9092,localhost:9093"); 
      //CustomMessageSerializer 
props.put("key.serializer","com.kafka.test.demo.CustomMessageSerializer"); 
      props.put("value.serializer", "com.kafka.test.demo.CustomMessageSerializer"); 
      try { 
       KafkaProducer<String, CustomMessage> producer = new KafkaProducer<String, CustomMessage>(props); 
       producer.send(new ProducerRecord<String, CustomMessage>("NewMessageTopic", "customMessage",customMessage)); 
       //producer.send(new ProducerRecord<String, CustomMessage>("NewMessageTopic", customMessage)); 
       System.out.println("Message " + "" + " sent !!"); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    consumer class:: 
    package com.kafka.test.demo; 

    import java.net.UnknownHostException; 
    import java.util.Collections; 
    import java.util.Properties; 

    import org.apache.kafka.clients.consumer.ConsumerRecord; 
    import org.apache.kafka.clients.consumer.ConsumerRecords; 
    import org.apache.kafka.clients.consumer.KafkaConsumer; 

    import com.mongodb.BasicDBObject; 
    import com.mongodb.DB; 
    import com.mongodb.DBCollection; 
    import com.mongodb.DBObject; 
    import com.mongodb.MongoClient; 

    public class KafkaaConsumer { 
     public static void main(String[] args) throws InterruptedException { 
      Properties props = new Properties(); 
      props.put("zookeeper.connect", "localhost:2181"); 
      props.put("group.id", "testgroup"); 
      props.put("zookeeper.session.timeout.ms", "4000"); 
      props.put("zookeeper.sync.time.ms", "300"); 
      props.put("rebalance.backoff.ms", "40000"); 
      props.put("bootstrap.servers", "localhost:9092,localhost:9093"); 
      props.put("value.deserializer", "com.kafka.test.demo.CustomMessageDeserializer"); 
      props.put("key.deserializer", "com.kafka.test.demo.CustomMessageDeserializer"); 
      //perisitMessage(); 
      try{ 
       KafkaConsumer<String,CustomMessage> consumer = new KafkaConsumer<String, CustomMessage>(props); 
       consumer.subscribe(Collections.singletonList("NewMessageTopic")); 
       while (true) { 
        ConsumerRecords<String, CustomMessage> messages = consumer.poll(100); 
        for (ConsumerRecord<String, CustomMessage> message : messages) { 
         System.out.println("Message received " + message); 
        } 
        perisitMessage(); 
       } 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 

     private static void perisitMessage() { 
      // TODO Auto-generated method stub 
      CustomMessage customMessage = new CustomMessage(); 
      customMessage.setMessage("hello kafka"); 
      customMessage.setFan("1234213123"); 
      customMessage.setSourceSystem("Dmap"); 
      customMessage.setStatus("Unenrolled"); 
      customMessage.setMessageTyep("Simple Message"); 
      customMessage.setCreatedTime("5"); 
      customMessage.setProcessedTime("6"); 
      customMessage.setRetryCount("3"); 
      try { 
       MongoClient mongoClient = new MongoClient("localhost" , 27017); 
       DB db = mongoClient.getDB("DeviceTrack"); 
       DBCollection msgCollection = db.getCollection("messages"); 
       BasicDBObject document = new BasicDBObject(); 
       document.put("message", customMessage.getMessage()); 
       document.put("fan", customMessage.getFan()); 
       document.put("SourceSystem", customMessage.getSourceSystem()); 
       document.put("RetryCount", customMessage.getRetryCount()); 
       document.put("ProcessedTime", customMessage.getProcessedTime()); 
       document.put("CreatedTime", customMessage.getCreatedTime()); 
       document.put("MessageTyep", customMessage.getMessageTyep()); 
       document.put("Status", customMessage.getStatus()); 
       msgCollection.insert(document); 
       System.out.println("Inserted in the data in DB succesfully"); 

      } catch (UnknownHostException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 
    } 

package com.kafka.test.demo; 

import java.util.Map; 

import org.apache.kafka.common.serialization.Deserializer; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class CustomMessageDeserializer implements Deserializer { 

    public Object deserialize(String arg0, byte[] arg1) { 
     ObjectMapper mapper = new ObjectMapper(); 
     System.out.println("arg1"+arg1); 
     CustomMessage message = null; 
     try { 
      message = mapper.readValue(arg1, CustomMessage.class); 
     } catch (Exception e) { 

      e.printStackTrace(); 
     } 
     System.out.println(""+message); 
     return message; 
    } 



    public void close() { 
     // TODO Auto-generated method stub 

    } 

    public void configure(Map arg0, boolean arg1) { 
     // TODO Auto-generated method stub 

    } 

} 

    package com.kafka.test.demo; 

    import java.util.Map; 

    import org.apache.kafka.common.serialization.Serializer; 

    import com.fasterxml.jackson.databind.ObjectMapper; 

    public class CustomMessageSerializer implements Serializer { 

     public byte[] serialize(String arg0, Object arg1) { 
      byte[] retVal = null; 
      ObjectMapper objectMapper = new ObjectMapper(); 
      try { 
       retVal = objectMapper.writeValueAsString(arg1).getBytes(); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
      System.out.println("value ::::::"+retVal); 
      return retVal; 
     } 

     public void close() { 
      // TODO Auto-generated method stub 

     } 

     public void configure(Map arg0, boolean arg1) { 
      // TODO Auto-generated method stub 

     } 
    } 

package com.kafka.test.demo; 

public class CustomMessage { 

    private String messageId; 
    private String parentMsgId; 
    private String fan; 
    private String message; 
    private String sourceSystem; 
    private String status; 
    private String messageTyep; 
    private String createdTime; 
    private String processedTime; 
    private String retryCount; 

    /** 
    * @return the messageId 
    */ 
    public String getMessageId() { 
     return messageId; 
    } 
    /** 
    * @param messageId the messageId to set 
    */ 
    public void setMessageId(String messageId) { 
     this.messageId = messageId; 
    } 
    /** 
    * @return the parentMsgId 
    */ 
    public String getParentMsgId() { 
     return parentMsgId; 
    } 
    /** 
    * @param parentMsgId the parentMsgId to set 
    */ 
    public void setParentMsgId(String parentMsgId) { 
     this.parentMsgId = parentMsgId; 
    } 
    /** 
    * @return the fan 
    */ 
    public String getFan() { 
     return fan; 
    } 
    /** 
    * @param fan the fan to set 
    */ 
    public void setFan(String fan) { 
     this.fan = fan; 
    } 
    /** 
    * @return the message 
    */ 
    public String getMessage() { 
     return message; 
    } 
    /** 
    * @param message the message to set 
    */ 
    public void setMessage(String message) { 
     this.message = message; 
    } 
    /** 
    * @return the sourceSystem 
    */ 
    public String getSourceSystem() { 
     return sourceSystem; 
    } 
    /** 
    * @param sourceSystem the sourceSystem to set 
    */ 
    public void setSourceSystem(String sourceSystem) { 
     this.sourceSystem = sourceSystem; 
    } 
    /** 
    * @return the status 
    */ 
    public String getStatus() { 
     return status; 
    } 
    /** 
    * @param status the status to set 
    */ 
    public void setStatus(String status) { 
     this.status = status; 
    } 
    /** 
    * @return the messageTyep 
    */ 
    public String getMessageTyep() { 
     return messageTyep; 
    } 
    /** 
    * @param messageTyep the messageTyep to set 
    */ 
    public void setMessageTyep(String messageTyep) { 
     this.messageTyep = messageTyep; 
    } 
    /** 
    * @return the createdTime 
    */ 
    public String getCreatedTime() { 
     return createdTime; 
    } 
    /** 
    * @param createdTime the createdTime to set 
    */ 
    public void setCreatedTime(String createdTime) { 
     this.createdTime = createdTime; 
    } 
    /** 
    * @return the processedTime 
    */ 
    public String getProcessedTime() { 
     return processedTime; 
    } 
    /** 
    * @param processedTime the processedTime to set 
    */ 
    public void setProcessedTime(String processedTime) { 
     this.processedTime = processedTime; 
    } 
    /** 
    * @return the retryCount 
    */ 
    public String getRetryCount() { 
     return retryCount; 
    } 
    /** 
    * @param retryCount the retryCount to set 
    */ 
    public void setRetryCount(String retryCount) { 
     this.retryCount = retryCount; 
    } 
} 
+0

使用しているカフカのバージョンを教えてください。 psの結果は何ですか?グレープカフカ – divyesh

答えて

0

zookeeperとkafkaインスタンスが必要です。

  1. スタート飼育係
  2. スタートカフカ
  3. ( "NewMessageTopic")
  4. 起動し、あなたのトピックを作成して、あなたの生産と消費のコード私は右のあなたを理解していれば

uが使用「kafka-コンソールプロデューサー "&" kafka-console-consumer "?彼らはあなたのカフカクラスターを使う必要はありません。あなたのコードがうまくいくならばcmdを使ってkafkaを起動することが大切だとすれば、.batを書くことができます。それは右見て、あなたのコードの最初の表情で

:startZK 
echo Zookeeper wird gestartet 
Start "Zookeper" C:\zookeeper-3.4.9\bin\zkServer.cmd 
echo Bitte warten bis Zookeeper gestartet ist. 
pause 
echo Kafka Wird Gestartet 
Start "Kafka" C:\kafka_2.11-0.10.2.0\bin\windows\kafka-server-start.bat C:\kafka_2.11-0.10.2.0\config\server.properties 

goto Top 

同様

。 私はあなたがあなたのsystem.outに得るデータを印刷するだけであることを知っていませんか?

  while (true) { 
       ConsumerRecords<String, CustomMessage> messages = consumer.poll(100); 
       for (ConsumerRecord<String, CustomMessage> message : messages) { 
        System.out.println("Message received " + message);<-- just a syso not more :/ 
       } 
       perisitMessage(); <-- maybe give him the message ? 
      } 

あなたのメッセージはアウトラインに表示されますか?それは正常に働いた。今夜は近くで見ることができないのを見てもらえませんか?ちょうど私にヒントを与えてください。しかし、私はMongoDBの経験がありません。

関連する問題