2017-07-10 16 views
0

CSVファイルからKafkaにデータを送信できません。ここでは、バッチ処理スプリングブートバッチファイルが読み込まれ、カフカにデータが送信される


import java.util.List; 

import javax.persistence.criteria.CriteriaBuilder.In; 

import org.springframework.batch.item.ItemWriter; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.kafka.core.KafkaTemplate; 

import com.codenotfound.kafka.repository.*; 

import java.util.*; 

import com.codenotfound.kafka.Car; 
import com.codenotfound.kafka.producer.Sender; 


public class Writer implements ItemWriter<Car>{ 

    private final Repository repo; 

    public Writer(Repository repo) { 
     this.repo = repo ;  
    } 

    @Override 
    public void write(List<? extends Car> car) throws Exception { 
     repo.save(car); 
    } 
} 

ので、代わりのrepo.save(車)ためwriter.javaのための私のコードは、私がこの車クラスの詳細はカフカに送信することにしたいです。

:ここ はカフカのための私のSenderファイルがそれぞれ自分の車のクラスとリポジトリのインターフェース


@Entity 
@Table(name = "Car") 

public class Car { 


    private String make; 

    private String manufacturer; 
    @Id 
    @GeneratedValue(strategy = GenerationType.AUTO) 
    private long id; 

    public Car() { 
    //super(); 
    } 

    public Car(String make, String manufacturer) { 
    super(); 
    this.make = make; 
    this.manufacturer = manufacturer; 
    } 

    public String getMake() { 
    return make; 
    } 

    public void setMake(String make) { 
    this.make = make; 
    } 

    public String getManufacturer() { 
    return manufacturer; 
    } 


    public void setManufacturer(String manufacturer) { 
    this.manufacturer = manufacturer; 
    } 

    public long getId() { 
    return id; 
    } 


    public void setId(long id) { 
    this.id = id; 
    } 

    @Override 
    public String toString() { 
    return "Car [make=" + make + ", manufacturer=" + manufacturer + ", id=" + id + "]"; 
    } 
} 

とリポジトリクラス


public interface Repository extends CrudRepository<Car, Long>,CustomRepository { 
} 

です10


package com.codenotfound.kafka.producer; 

import java.util.List; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.kafka.core.KafkaTemplate; 

import com.codenotfound.kafka.Car; 

public class Sender { 

    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); 

    @Value("${topic.json}") 
    private String jsonTopic; 

    @Autowired 
    private KafkaTemplate<String, Car> kafkaTemplate; 

    public void send(Car car) { 
    LOGGER.info("sending car='{}'", car.toString()); 
    kafkaTemplate.send(jsonTopic, car); 
    } 
} 

私には私のカフカにCSVファイルからデータを送信する方法を提案してください。

答えて

0

あなたはすでにあなたのために配置されたパズルのすべての部分を持っているようです。何をする必要があなたはこのようなものがあるだろうので、あなたの送信者クラスを使用してItemWriterを変更です:

@Component 
public class Writer implements ItemWriter<Car> { 

    @Value("${topic.json}") 
    private String jsonTopic; 

    @Autowired 
    private KafkaTemplate<String, Car> kafkaTemplate; 

    @Override 
    public void write(List<? extends Car> cars) throws Exception { 
     cars.forEach(car -> kafkaTemplate.send(jsonTopic, car)); 
    } 
} 

とあなたの仕事の設定では、あなたはこのようにそれをautowireする必要があります(以下のコードは、単純化されちょうどステップで作家)宣言する方法を示す:

@Configuration 
@EnableBatchProcessing 
public class BatchConfiguration { 

    @Autowired 
    public StepBuilderFactory stepBuilderFactory; 

    @Autowired 
    private Writer carWriter; 

    @Bean 
    public Step myStep() { 
     this.stepBuilderFactory.get("myStep") 
      .<Car,Car> chunk(10) 
      .reader(reader()) 
      .writer(carWriter) // don't use new here 
      .build(); 
    } 
} 
+0

おかげで...私はすでにこのアイデアを適用したがKafKaTemplateオートワイヤリングは、それが初期化/設定される前に行わ行われます。だから私はAutowiringのnullポインタを取得しています。私はこのオートワイヤリングを遅らせるために@Order()を試しましたが、成功しませんでした。 –

+0

これはどういうことですか?作者に '@ Component'を注釈して、あなたのジョブ設定でautowireする必要があります。 'new'を使って作家を作っているのであれば、autowireはしません。 – Tarek

関連する問題