0
CSVファイルからKafkaにデータを送信できません。ここでは、バッチ処理スプリングブートバッチファイルが読み込まれ、カフカにデータが送信される
import java.util.List;
import javax.persistence.criteria.CriteriaBuilder.In;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import com.codenotfound.kafka.repository.*;
import java.util.*;
import com.codenotfound.kafka.Car;
import com.codenotfound.kafka.producer.Sender;
public class Writer implements ItemWriter<Car>{
private final Repository repo;
public Writer(Repository repo) {
this.repo = repo ;
}
@Override
public void write(List<? extends Car> car) throws Exception {
repo.save(car);
}
}
ので、代わりのrepo.save(車)ためwriter.javaのための私のコードは、私がこの車クラスの詳細はカフカに送信することにしたいです。
:ここ はカフカのための私のSenderファイルがそれぞれ自分の車のクラスとリポジトリのインターフェース@Entity
@Table(name = "Car")
public class Car {
private String make;
private String manufacturer;
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private long id;
public Car() {
//super();
}
public Car(String make, String manufacturer) {
super();
this.make = make;
this.manufacturer = manufacturer;
}
public String getMake() {
return make;
}
public void setMake(String make) {
this.make = make;
}
public String getManufacturer() {
return manufacturer;
}
public void setManufacturer(String manufacturer) {
this.manufacturer = manufacturer;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
@Override
public String toString() {
return "Car [make=" + make + ", manufacturer=" + manufacturer + ", id=" + id + "]";
}
}
とリポジトリクラス
public interface Repository extends CrudRepository<Car, Long>,CustomRepository {
}
です10
package com.codenotfound.kafka.producer;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import com.codenotfound.kafka.Car;
public class Sender {
private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
@Value("${topic.json}")
private String jsonTopic;
@Autowired
private KafkaTemplate<String, Car> kafkaTemplate;
public void send(Car car) {
LOGGER.info("sending car='{}'", car.toString());
kafkaTemplate.send(jsonTopic, car);
}
}
私には私のカフカにCSVファイルからデータを送信する方法を提案してください。
おかげで...私はすでにこのアイデアを適用したがKafKaTemplateオートワイヤリングは、それが初期化/設定される前に行わ行われます。だから私はAutowiringのnullポインタを取得しています。私はこのオートワイヤリングを遅らせるために@Order()を試しましたが、成功しませんでした。 –
これはどういうことですか?作者に '@ Component'を注釈して、あなたのジョブ設定でautowireする必要があります。 'new'を使って作家を作っているのであれば、autowireはしません。 – Tarek