2016-08-25 10 views
0

私はApache kafkaからelasticsearchにデータを処理するためのプログラミングに取り組んでいます。その目的のために私はApache Sparkを使用しています。私は多くのリンクを通過しましたが、JavaDStreamのデータをApacheのsparkでelasticsearchに書き込む例は見つかりませんでした。ApacheのJavaDStream <String>からデータを書き込むとelasticsearchに書き込まれます

以下は、kafkaからデータを取得して印刷するsparkのサンプルコードです。弾性検索に保存する

import org.apache.log4j.Logger; 
import org.apache.log4j.Level; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.Map; 
import java.util.Set; 
import java.util.regex.Pattern; 

import scala.Tuple2; 

import kafka.serializer.StringDecoder; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.*; 
import org.apache.spark.streaming.api.java.*; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.apache.spark.streaming.Durations; 
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; 

import com.google.common.collect.ImmutableMap; 

import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 

import java.util.List; 
public class SparkStream { 

    public static JavaSparkContext sc; 
    public static List<Map<String, ?>> alldocs; 

    public static void main(String args[]) 
    { 
     if(args.length != 2) 
     { 
      System.out.println("SparkStream <broker1-host:port,broker2-host:port><topic1,topic2,...>"); 

      System.exit(1); 
     } 

     Logger.getLogger("org").setLevel(Level.OFF); 
     Logger.getLogger("akka").setLevel(Level.OFF); 
     SparkConf sparkConf=new SparkConf().setAppName("Data Streaming"); 
     sparkConf.setMaster("local[2]"); 
     sparkConf.set("es.index.auto.create", "true"); 
     sparkConf.set("es.nodes","localhost"); 
     sparkConf.set("es.port","9200"); 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); 

     Set<String> topicsSet=new HashSet<>(Arrays.asList(args[1].split(","))); 
     Map<String,String> kafkaParams=new HashMap<>(); 
     String brokers=args[0]; 
     kafkaParams.put("metadata.broker.list",brokers); 
     kafkaParams.put("auto.offset.reset", "largest"); 
     kafkaParams.put("offsets.storage", "zookeeper"); 
     JavaPairDStream<String, String> messages=KafkaUtils.createDirectStream(
       jssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topicsSet 
     ); 
     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
      @Override 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
     }); 
     lines.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

`

答えて

0

一つの方法は、foreachRDD関数内saveToEsメソッドを使用しています。あなたが使用したい他の方法でも、あなたのdstreamへのforeachRDDコールが必要です。例えば

lines.foreachRDD(lambda rdd: rdd.saveToEs("ESresource")) 

See here for more

+0

javaの構文が表示されません。 – Gaurav

+0

上記の構文は、pyspark用です。 Javaの場合、 'import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;を試してください。 JavaEsSpark.saveToEs(javaRDD、 "ESresource"); ' –

0
dstream.foreachRDD{rdd=> 
     val es = sqlContext.createDataFrame(rdd).toDF("use headings suitable for your dataset") 
     import org.elasticsearch.spark.sql._ 
     es.saveToEs("wordcount/testing") 
    es.show() 
} 

このコードブロック "DSTREAM" においては、カフカのようなサーバーからのデータを観察するデータストリームです。 "toDF()"の括弧の中に見出しをつけなければなりません。 "saveToES()"ではelasticsearchインデックスを使用しています。これの前に、SQLContextを作成します。

val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate()) 

あなたは

この例で
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.1" 

Get the dependency

To see full example see

の下に言及した依存関係を追加する必要があるデータを送信するためにカフカを使用している場合、最初のあなたは「カフカのプロデューサーを作成する必要がありますテストしてからelasticsearchを開始してください プログラムを実行した後。あなたは上記のURLを使用して完全なsbtとコードを見ることができます。

関連する問題