1
私は、HDFSからデータを読み出すためにスパークストリーミングを使用したいと考えています。アイデアは、別のプログラムが、私のスパークストリーミングジョブが処理するHDFSディレクトリに新しいファイルをアップロードし続けるということです。しかし、私はまた、終了条件が欲しいです。すなわち、HDFSにファイルをアップロードするプログラムが、スパークストリーミングプログラムに信号を送ることができ、すべてのファイルをアップロードする方法である。スパークストリーミングでストップコンディションを作成する方法は?
簡単な例として、Hereからプログラムを入手してください。コードを以下に示します。別のプログラムがそれらのファイルをアップロードしていると仮定して、そのプログラムによって終了条件をどのようにプログラムで通知できますか?(CTRL + Cを押す必要はありません)
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage StreamingWordCount <input-directory> <output-directory>")
System.exit(0)
}
val inputDir=args(0)
val output=args(1)
val conf = new SparkConf().setAppName("Spark Streaming Example")
val streamingContext = new StreamingContext(conf, Seconds(10))
val lines = streamingContext.textFileStream(inputDir)
val words = lines.flatMap(_.split(" "))
val wc = words.map(x => (x, 1))
wc.foreachRDD(rdd => {
val counts = rdd.reduceByKey((x, y) => x + y)
counts.saveAsTextFile(output)
val collectedCounts = counts.collect
collectedCounts.foreach(c => println(c))
}
)
println("StreamingWordCount: streamingContext start")
streamingContext.start()
println("StreamingWordCount: await termination")
streamingContext.awaitTermination()
println("StreamingWordCount: done!")
}
}
ジョブの最後に制御バイトを追加し、Spark Streamingプログラムでこれらのバイトを監視し、それらのバイトが一致すると終了することができますか? 0x1c0x0dのようなものを追加しますか?また、このユースケースでSparkストリーミングを使用し、ファイルをアップロードした後で別のジョブを開始しない理由は何ですか? – pjames