私はJSONファイルを読み込んで使用して、それを生産しようとしていますがカフカ... は、ここに私のコードです:kafka.common.FailedToSendMessageException:カフカは農産物エラー
public class FlatFileDataProducer {
private String topic = "JsonTopic";
private Producer<String, String> producer = null;
KeyedMessage<String, String> message = null;
public JsonReader reader;
public void run(String jsonPath) throws ClassNotFoundException, FileNotFoundException, IOException, ParseException{
reader = new JsonReader();
System.out.println("---------------------");
System.out.println("JSON FILE PATH IS : "+jsonPath);
System.out.println("---------------------");
Properties prop = new Properties();
prop.put("metadata.broker.list", "192.168.63.145:9092");
prop.put("serializer.class", "kafka.serializer.StringEncoder");
// prop.put("partitioner.class", "example.producer.SimplePartitioner");
prop.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(prop);
producer = new Producer<String, String>(config);
List<Employee> emp = reader.readJsonFile(jsonPath);
for (Employee employee : emp)
{
System.out.println("---------------------");
System.out.println(employee.toString());
System.out.println("---------------------");
message = new KeyedMessage<String, String>(topic, employee.toString());
producer.send(message);
producer.close();
}
System.out.println("Messages to Kafka successfully");
}
とJSONファイルを読み込むためのコードです:
public List<Employee> readJsonFile(String path) throws FileNotFoundException, IOException, ParseException{
Employee employee = new Employee();
parser=new JSONParser();
Object obj = parser.parse(new FileReader(path));
JSONObject jsonObject = (JSONObject) obj;
employee.setId(Integer.parseInt(jsonObject.get("id").toString()));
employee.setName((String)jsonObject.get("name"));
employee.setSalary(Integer.parseInt(jsonObject.get("salary").toString()));
list.add(employee);
return list;
}
しかし、ときに私はプログラムを実行し、 ISSUE 1:
> [[email protected] ~]# java -jar sparkkafka.jar /root/customer.json
> JSON FILE PATH IS : /root/customer.json
> log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please
> initialize the log4j system properly.
> 1,Smith,25
> Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages
> after 3 tries.
> at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
> at kafka.producer.Producer.send(Producer.scala:77)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at com.up.jsonType.FlatFileDataProducer.run(FlatFileDataProducer.java:41)
> at com.up.jsonType.FlatFileDataProducer.main(FlatFileDataProducer.java:49)
それはエラーになりますが、私はcosumerシェルをチェックするとき、私は以下のように取得する:
[ルート@サンドボックスビン]# :JSONファイルで1行分の私はシェルで4エントリ.. ISSUE 2を参照してください[ルート@サンドボックスビン]#./kafka-console-consumer.sh --zookeeperはlocalhost:2181 --topic JsonTopic
1,Smith,25
1,Smith,25
1,Smith,25
1,Smith,25
--from-始まる私は、同じデータのために4回の行を取得しています。
prop.put("producer.type","async");
これで問題は解決しました – Alka