2016-09-28 11 views

答えて

0

スパークストリーミングを使用すると、カフカのトピックからデータを消費することができます。

あなたはカフカのトピックにレコードを公開したい場合は、カフカProducerを使用することができます[https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example]

それとも、複数のソースのコネクタを使用してカフカトピックにデータを公開するカフカConnectを使用することができます。[http://www.confluent.io/product/connectors/]

ご覧ください。 SparkストリーミングとKafkaの統合の詳細については、以下のリンクを参照してください。

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

0

私はJavaを使用してそれを行っています。この関数をJavaDStream<String>に渡して、.foreachRDD()の引数として使用できます。各RDDに対してKafkaProducerを作成するための最良の方法ではありません。socket example in Spark documentationのようなKafkaProducersの「プール」を使用してこれを行うことができます。ここで

は私のコードです:

public static class KafkaPublisher implements VoidFunction<JavaRDD<String>> { 
    private static final long serialVersionUID = 1L; 

    public void call(JavaRDD<String> rdd) throws Exception { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "loca192.168.0.155lhost:9092"); 
     props.put("acks", "1"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1000); 
     props.put("buffer.memory", 33554432); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

     rdd.foreachPartition(new VoidFunction<Iterator<String>>() { 
      private static final long serialVersionUID = 1L; 

      public void call(Iterator<String> partitionOfRecords) throws Exception { 
       Producer<String, String> producer = new KafkaProducer<>(props); 
       while(partitionOfRecords.hasNext()) { 
        producer.send(new ProducerRecord<String, String>("topic", partitionOfRecords.next())); 
       } 
       producer.close(); 
      } 
     }); 
    } 
} 
関連する問題