2016-04-01 13 views
1

私はJSONファイルを読み込んで使用して、それを生産しようとしていますがカフカ... は、ここに私のコードです:kafka.common.FailedToSendMessageException:カフカは農産物エラー

public class FlatFileDataProducer { 

    private String topic = "JsonTopic"; 
    private Producer<String, String> producer = null; 
    KeyedMessage<String, String> message = null; 
    public JsonReader reader; 

    public void run(String jsonPath) throws ClassNotFoundException, FileNotFoundException, IOException, ParseException{ 
     reader = new JsonReader(); 
     System.out.println("---------------------"); 
     System.out.println("JSON FILE PATH IS : "+jsonPath); 
     System.out.println("---------------------"); 
     Properties prop = new Properties(); 
     prop.put("metadata.broker.list", "192.168.63.145:9092"); 
     prop.put("serializer.class", "kafka.serializer.StringEncoder"); 
     // prop.put("partitioner.class", "example.producer.SimplePartitioner"); 
     prop.put("request.required.acks", "1"); 


     ProducerConfig config = new ProducerConfig(prop); 
     producer = new Producer<String, String>(config); 
     List<Employee> emp = reader.readJsonFile(jsonPath);  
     for (Employee employee : emp) 
     { 
      System.out.println("---------------------"); 
      System.out.println(employee.toString()); 
      System.out.println("---------------------"); 
      message = new KeyedMessage<String, String>(topic, employee.toString()); 

      producer.send(message); 
      producer.close(); 

     } 
     System.out.println("Messages to Kafka successfully"); 
    } 

とJSONファイルを読み込むためのコードです:

public List<Employee> readJsonFile(String path) throws FileNotFoundException, IOException, ParseException{ 
     Employee employee = new Employee(); 
     parser=new JSONParser(); 
     Object obj = parser.parse(new FileReader(path)); 
     JSONObject jsonObject = (JSONObject) obj; 
     employee.setId(Integer.parseInt(jsonObject.get("id").toString()));  
     employee.setName((String)jsonObject.get("name")); 
     employee.setSalary(Integer.parseInt(jsonObject.get("salary").toString())); 
     list.add(employee); 
     return list; 
    } 

しかし、ときに私はプログラムを実行し、 ISSUE 1:

> [[email protected] ~]# java -jar sparkkafka.jar /root/customer.json 
> JSON FILE PATH IS : /root/customer.json 
> log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please 
> initialize the log4j system properly. 
> 1,Smith,25 
> Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages 
> after 3 tries. 
>   at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91) 
>   at kafka.producer.Producer.send(Producer.scala:77) 
>   at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
>   at com.up.jsonType.FlatFileDataProducer.run(FlatFileDataProducer.java:41) 
>   at com.up.jsonType.FlatFileDataProducer.main(FlatFileDataProducer.java:49) 

それはエラーになりますが、私はcosumerシェルをチェックするとき、私は以下のように取得する:

[ルート@サンドボックスビン]# :JSONファイルで1行分の私はシェルで4エントリ.. ISSUE 2を参照してください[ルート@サンドボックスビン]#./kafka-console-consumer.sh --zookeeperはlocalhost:2181 --topic JsonTopic

1,Smith,25 
1,Smith,25 
1,Smith,25 
1,Smith,25 

--from-始まる私は、同じデータのために4回の行を取得しています。

prop.put("producer.type","async"); 

答えて

1

あなたは以下proprty両方削除する必要があります:

//prop.put("request.required.acks", "1"); 
    //prop.put("producer.type","async"); 

このプロパティはactully承認についての世話をするだろうし

+0

これで問題は解決しました – Alka

1

あなたは、プロパティの下に追加してみてくださいことができます。

+0

この問題を解決しました。 – Alka

関連する問題