2016-05-20 15 views
5

Apache Spark Streaming 1.6.1を使用して、2つのKey/Valueデータストリームを結合し、出力をHDFSに書き込むJavaアプリケーションを作成しています。 2つのデータストリームはK/Vストリングを含み、textFileStream()を使用してHDFSからSparkで定期的に取り込まれます。Spark Streamingで複数のバッチ間隔でデータストリームを運ぶ方法

2つのデータストリームは同期していません。つまり、時刻t0でstream1にあるキーが時刻t1にstream2に表示されることがあります。したがって、私の目標は2つのストリームを結合し、次のバッチ間隔で結合操作のために考慮すべき "残りの"キーを計算することです。私は失敗したスパークストリーミングで、このアルゴリズムを実装しようとした

variables: 
stream1 = <String, String> input stream at time t1 
stream2 = <String, String> input stream at time t1 
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0 
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0 

operations at time t1: 
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2) 
write out_stream to HDFS 
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2) 
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2) 

:より良い本を明確にするために

は、以下のアルゴリズムを見てください。当初、私は(これは1つのストリームだけであるが、第二の流れを生成するためのコードが似ている)、このように残りのキーの2つの空のストリームを作成します。

JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context 
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>(); 
q.add(empty_rdd); 
JavaDStream<String> empty_dstream = jssc.queueStream(q); 
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() { 
           @Override 
           public scala.Tuple2<String, String> call(String s) { 
            return new scala.Tuple2(s, s); 
           } 
           }); 

後で、この空のストリームが統一されている(すなわち、 union())をstream1で処理し、最後にjoin1の後にstream1から残ったキーを追加してwindow()を呼び出します。 stream2でも同じことが起こります。

left_keys_s1とleft_keys_s2を生成する操作がアクションのない変換であるため、SparkはRDDフローグラフを作成せず、実行されません。私が今得するのは、キーが同じ時間間隔でstream1とstream2にあるレコードだけを出力する結合です。

Sparkでこれを正しく実装する提案はありますか?

おかげで、

マルコ

答えて

1

我々これらの値が保持されるRDDへの参照を保持することによって、次の1つのバッチからの値オーバーを実行することが可能であるべきです。

queueDStreamを使用してストリームをマージしないでください。代わりに、各ストリーミング間隔で更新可能な可変RDD参照を宣言してください。

これは一例です。このストリーミングジョブで

、我々は100整数を持ち歩くRDDで始まります。各区間10の乱数が生成され、最初の100個の整数に対して差し引かれます。このプロセスは、100要素の初期RDDが空になるまで続く。この例では、ある区間から次の区間に要素を引き継ぐ方法を示します。私は、これはあなたに完全なユースケースを実装するのに十分な指導を与える願っています

Removing 100 ints by generating random numbers

import scala.util.Random 
    import org.apache.spark.streaming.dstream._ 

    val ssc = new StreamingContext(sparkContext, Seconds(2)) 

    var targetInts:RDD[Int] = sc.parallelize(0 until 100) 

    var loops = 0 

    // we create an rdd of functions that generate random data. 
    // evaluating this RDD at each interval will generate new random data points. 
    val randomDataRdd = sc.parallelize(1 to 10).map(_ =>() => Random.nextInt(100)) 

    val dstream = new ConstantInputDStream(ssc, randomDataRdd) 

    // create values from the random func rdd 

    dataDStream.foreachRDD{rdd => 
         loops += 1 
         targetInts = targetInts.subtract(rdd) 
         if (targetInts.isEmpty) {println(loops); ssc.stop(false)} 
         } 


    ssc.start() 

この例を実行するとtargetInts.countに対してloopsをプロットするには、以下のチャートを提供します。

関連する問題