2017-10-09 12 views
1

私は、HDFSからデータを読み出すためにスパークストリーミングを使用したいと考えています。アイデアは、別のプログラムが、私のスパークストリーミングジョブが処理するHDFSディレクトリに新しいファイルをアップロードし続けるということです。しかし、私はまた、終了条件が欲しいです。すなわち、HDFSにファイルをアップロードするプログラムが、スパークストリーミングプログラムに信号を送ることができ、すべてのファイルをアップロードする方法である。スパークストリーミングでストップコンディションを作成する方法は?

簡単な例として、Hereからプログラムを入手してください。コードを以下に示します。別のプログラムがそれらのファイルをアップロードしていると仮定して、そのプログラムによって終了条件をどのようにプログラムで通知できますか?(CTRL + Cを押す必要はありません)

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 

object StreamingWordCount { 
    def main(args: Array[String]) { 
    if (args.length < 2) { 
     System.err.println("Usage StreamingWordCount <input-directory> <output-directory>") 
     System.exit(0) 
    } 
    val inputDir=args(0) 
    val output=args(1) 
    val conf = new SparkConf().setAppName("Spark Streaming Example") 
    val streamingContext = new StreamingContext(conf, Seconds(10)) 
    val lines = streamingContext.textFileStream(inputDir) 
    val words = lines.flatMap(_.split(" ")) 
    val wc = words.map(x => (x, 1)) 
    wc.foreachRDD(rdd => { 
     val counts = rdd.reduceByKey((x, y) => x + y) 
     counts.saveAsTextFile(output) 
     val collectedCounts = counts.collect 
     collectedCounts.foreach(c => println(c)) 
    } 
    ) 

    println("StreamingWordCount: streamingContext start") 
    streamingContext.start() 
    println("StreamingWordCount: await termination") 
    streamingContext.awaitTermination() 
    println("StreamingWordCount: done!") 
    } 
} 
+0

ジョブの最後に制御バイトを追加し、Spark Streamingプログラムでこれらのバイトを監視し、それらのバイトが一致すると終了することができますか? 0x1c0x0dのようなものを追加しますか?また、このユースケースでSparkストリーミングを使用し、ファイルをアップロードした後で別のジョブを開始しない理由は何ですか? – pjames

答えて

0

OK、取得しました。基本的には、ssc.stop()を呼び出す場所から別のスレッドを作成し、ストリーム処理を停止するように通知します。たとえば、次のようにします。

val ssc = new StreamingContext(sparkConf, Seconds(1)) 
////////////////////////////////////////////////////////////////////// 
val thread = new Thread 
{ 
    override def run 
    { 
     .... 
     // On reaching the end condition 
     ssc.stop() 
    } 
} 
thread.start 
////////////////////////////////////////////////////////////////////// 
val lines = ssc.textFileStream("inputDir") 
..... 
関連する問題