2017-04-05 10 views
0

Spark 2.0を使用していて、wholeTextFiles APIでファイルをストリーミングしようとしています。私のスパークプログラムが正常にフォルダ内のファイルの最初のバッチを、ファイルを読んでいるが、私は、ファイルの後にバッチをストリーミングすることはできません。..Spark Streaming with wholeTextFiles

私はWholeTextFilesのAPIを使用しているファイルをストリーミングんか教えてください。

SparkConf sparkConf = new SparkConf().setAppName("My app") 
          .setMaster("local") 
          .set("spark.driver.allowMultipleContexts", "true"); 

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(15)); 

JavaPairRDD<String, String> WholeTextLocalFiles = jssc.sparkContext().wholeTextFiles("C:/Users/my/files/abcd7/simple/*.txt"); 

JavaRDD<String> stringRDD = wholeTextFiles.map(
    ----- 
    ---- 
    return mySchema;); 

SQLContext hc = new HiveContext(jssc.sparkContext()); 

Dataset<Row> df = hc.createDataFrame(schemaRDD, mySchema.class); 

df.createOrReplaceTempView("myView"); 

df.show();  
jssc.start(); 
jssc.awaitTermination(); 

スパークは、最初のバッチのためのデータを処理している:

は、ここに私のコードです。しかし、これ以上のバッチはありません。私はここでjavaDStreamを使用していないので、このエラーが発生している可能性があります。どのように私は、wholetextfiles APIからjavaDStreamを取得するのですか?

のUPDATE ERROR:

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554) 
    at com.comcast.emm.vodip.WholeTextLocal.WholeTextLocal.main(WholeTextLocal.java:225) 
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:554) 
+0

ストリームを開始しませんでした。あなたのデータで何をすべきかを考えたら、スパークが実際にデータの読み込みと処理を開始するようにしてください。 – Mehraban

+0

jssc.start()を追加しました。 jssc.awaitTermination(); Sparkが最初のバッチのデータを処理しています。しかし、それ以上のバッチは何もありません。エラーが発生しました.DU.show()を実行した後でも、ジョブはストリーミングしていません。 – AKC

答えて

0

RDDを返しますwholeTextFile。何らかのアクションを実行しない限り、sparkは何も操作を開始しません。

「出力操作が登録されていないため、実行する必要はありません」というエラーは、ストリーミングコンテキストをまったく使用していないことを示しています。

ストリーミングジョブの作成方法については、sparkのマニュアルの例を参照してください。

+0

ストリーミングジョブではなく、最初のバッチだけを処理しています。あなたはこのことについて何か指導してください。 – AKC

0

spark docsによれば、データを処理するコアが存在しないため、ストリームを処理するときにmasterをlocalまたはlocal[1]に設定しないでください。

When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).

関連する問題