2016-04-08 18 views
4

Iは、SparkクイックスタートガイドからJavaで書かれた簡単なコード実行:期待どおりに印刷"Counter value 15"異なる結果

public static void main(String[] args) { 
     SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     Accumulator<Integer> counter = sc.accumulator(0); 
     List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); 
     JavaRDD<Integer> rdd = sc.parallelize(data); 
     rdd.foreach(counter::add); 
     System.out.println("Counter value " + counter); 
} 

を。 私はScalaで書かれた同じロジックとコードがあります。

object Counter extends App { 
    val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]") 
    val sc = new SparkContext(conf) 
    val counter = sc.accumulator(0) 
    val data = Array(1, 2, 3, 4, 5) 
    val rdd = sc.parallelize(data) 
    rdd.foreach(x => counter += x) 
    println(s"Counter value: $counter") 
} 

をしかし、それたびにプリント不正な結果(< 15)。私のScalaコードで何が間違っていますか?

Java spark lib "org.apache.spark:spark-core_2.10:1.6.1" 
Scala spark lib "org.apache.spark" %% "spark-core" % "1.6.1" 
+0

あなたは 'map'の代わりに、' foreach'を使用して試すことができますか? (結果は破棄されますが、とにかく試してみてください;評価を強制するために 'rdd.count()'のようなものを呼び出してください) – ale64bit

+0

@ ale64bit私は 'rdd.map(x => counter + = x)を実行しました。 count() 'しかし、不正確な結果も生成する – Cortwave

答えて

6

quick-startドキュメントのアドバイスは言う:アプリケーションがscala.Appを拡張する代わりに のmain()メソッドを定義する必要があります

注意を。 scala.Appのサブクラスが正しく動作しないことがあります。

多分これは問題ですか?

はして試してみてください。

object Counter { 
    def main(args: Array[String]): Unit = { 
     val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]") 
     val sc = new SparkContext(conf) 
     val counter = sc.accumulator(0) 
     val data = Array(1, 2, 3, 4, 5) 
     val rdd = sc.parallelize(data) 
     rdd.foreach(x => counter += x) 
     println(s"Counter value: $counter") 
    } 
} 
+0

素晴らしい!それは動作します、ありがとう – Cortwave