Apache Spark Streaming 1.6.1を使用して、2つのKey/Valueデータストリームを結合し、出力をHDFSに書き込むJavaアプリケーションを作成しています。 2つのデータストリームはK/Vストリングを含み、textFileStream()を使用してHDFSからSparkで定期的に取り込まれます。Spark Streamingで複数のバッチ間隔でデータストリームを運ぶ方法
2つのデータストリームは同期していません。つまり、時刻t0でstream1にあるキーが時刻t1にstream2に表示されることがあります。したがって、私の目標は2つのストリームを結合し、次のバッチ間隔で結合操作のために考慮すべき "残りの"キーを計算することです。私は失敗したスパークストリーミングで、このアルゴリズムを実装しようとした
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)
:より良い本を明確にするために
は、以下のアルゴリズムを見てください。当初、私は(これは1つのストリームだけであるが、第二の流れを生成するためのコードが似ている)、このように残りのキーの2つの空のストリームを作成します。
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String>() {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});
後で、この空のストリームが統一されている(すなわち、 union())をstream1で処理し、最後にjoin1の後にstream1から残ったキーを追加してwindow()を呼び出します。 stream2でも同じことが起こります。
left_keys_s1とleft_keys_s2を生成する操作がアクションのない変換であるため、SparkはRDDフローグラフを作成せず、実行されません。私が今得するのは、キーが同じ時間間隔でstream1とstream2にあるレコードだけを出力する結合です。
Sparkでこれを正しく実装する提案はありますか?
おかげで、
マルコ