2017-06-17 10 views
1

SparkContextに内容が実行されていて、すべて終了したらどうすれば止めるのですか?現在、私はSparkContext.stopを呼び出す前に30秒待っています。さもなければ、私のアプリはエラーをスローします。SparkContextがすべての処理を終了するのを待つ方法は?

import org.apache.log4j.Level 
import org.apache.log4j.Logger 
import org.apache.spark.SparkContext 

object RatingsCounter extends App { 

    // set the log level to print only errors 
    Logger.getLogger("org").setLevel(Level.ERROR) 

    // create a SparkContext using every core of the local machine, named RatingsCounter 
    val sc = new SparkContext("local[*]", "RatingsCounter") 

    // load up each line of the ratings data into an RDD (Resilient Distributed Dataset) 
    val lines = sc.textFile("src/main/resource/u.data", 0) 

    // convert each line to s string, split it out by tabs and extract the third field. 
    // The file format is userID, movieID, rating, timestamp 
    val ratings = lines.map(x => x.toString().split("\t")(2)) 

    // count up how many times each value occurs 
    val results = ratings.countByValue() 

    // sort the resulting map of (rating, count) tuples 
    val sortedResults = results.toSeq.sortBy(_._1) 

    // print each result on its own line. 
    sortedResults.foreach { case (key, value) => println("movie ID: " + key + " - rating times: " + value) } 

    Thread.sleep(30000) 

    sc.stop() 
} 
+0

スカラ座= 2.11.8と火花= 1.6.1 –

+0

は、あなたの中にあなたの主な機能を入れているオブジェクトを共有することはできますか? – eliasah

+1

あなたは、mainをdefに拡張するのではなく、sc.textfileの2番目の引数に1を加えて試すことができますか? –

答えて

5

スパークアプリケーション代わりscala.Appを拡張するmain()方法を定義する必要があります。サブクラスのscala.Appが正しく動作しないことがあります。

あなたがアプリケーションを拡張しているので、予期しない動作が発生しています。

詳細については、Self Contained Applicationsの公式ドキュメントを参照してください。

AppDelayedInitを使用し、初期化の問題を引き起こす可能性があります。主な方法で、あなたは何が起こっているのか知っています。 Excerpt from reddit.

object HelloWorld extends App { 
    var a = 1 
    a + 1 
    override def main(args: Array[String]) { 
    println(a) // guess what's the value of a ? 
    } 
} 
+0

私はScalaでmainメソッドを定義する方法を知っています。私の問題は、どのように質問に書いてあったか、SparkContextからコールバックメソッドを定義する方法です。 –

+0

睡眠を追加しないとエラーが発生したと言われました。あなたの実際のコードに問題がある理由について私は答えました。コールバックの場合、spark-jobserverのようなものを使用する必要があります。これは、sparkが他の重いScalaアプリケーションと同様にコールバックを返すことができないためです。 – eliasah

+0

私はこれがあなたのエラーだと信じています。なぜなら、現在、私は30秒前にSparkContext.stopを呼び出すのを待っているから、そうでなければ私のアプリはエラーをスローします。そして、それはコールバックのためではありません... – eliasah

関連する問題