2017-11-22 26 views
1

私は奇妙な何かを満たしスパークmapPatartitionsを使用する場合、mutable.HashSetが作成されたが正しくマッププロセスに充填することができない、ここでのコードは次のとおりです。の実行順序

object Test { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Test").setMaster("local") 
    val sc = new SparkContext(conf) 
    val input = List[String]("1", "2", "3", "3", "4", "5", "5") 
    val result = sc.parallelize(input) 
     .mapPartitions((pi: Iterator[String]) => { 
     val valuesInPartition = new mutable.HashSet[String]() 
     val values = pi.map(line => { 
      valuesInPartition.add(line) 
      println("processing line: " + line + ", valuesInPartition: " + valuesInPartition) 
     }) 
     println("valuesInPartition: " + valuesInPartition) 
     values 
     }) 
    result.collect 
    } 
} 

と出力:

valuesInPartition: Set() 
processing line: 1, valuesInPartition: Set(1) 
processing line: 2, valuesInPartition: Set(1, 2) 
processing line: 3, valuesInPartition: Set(3, 1, 2) 
processing line: 3, valuesInPartition: Set(3, 1, 2) 
processing line: 4, valuesInPartition: Set(3, 4, 1, 2) 
processing line: 5, valuesInPartition: Set(3, 4, 1, 5, 2) 
processing line: 5, valuesInPartition: Set(3, 4, 1, 5, 2) 

私が理解しているように、mapPartition内のコードはシーケンシャルに実行する必要があります。マップ機能が終了したら、最初の行を最後に印刷することになっています。しかし、ここでは、Setは値を付けずに印刷されます。

私はここで何か間違ったことを理解していると思います。

答えて

4

これはSparkとは何の関係もありません。誤解はIteratorの意味とmapの意味に関するものです。 Iteratorは、一度に1つの要素の構造をトラバースする方法です。 pi.map(line => ...)を呼び出すと別のIteratorが生成されますが、その要素の要求時にはIteratorの各要素を生成する際の副作用が感じられます。

は、以下の(昔ながらのScala)REPLの相互作用を検討:あなたのケースでは

scala> val l1 = List(1,2,3,4,5) 
l1: List[Int] = List(1, 2, 3, 4, 5) 

scala> val l2 = l1.map(println) 
1 
2 
3 
4 
5 
l2: List[Unit] = List((),(),(),(),()) 

scala> val i1 = Iterator(1,2,3,4,5) 
i1: Iterator[Int] = non-empty iterator 

scala> val i2 = i1.map(println)   // Look Ma, nothing happened!! 
i2: Iterator[Unit] = non-empty iterator 

scala> i2.next       // Request the first element... 
1 

scala> i2.next       // Request the second element... 
2 

scala> val l3 = i2.toList    // Request remaining elements. 
3 
4 
5 
l3: List[Unit] = List((),(),()) 

を、valuesに保存されているIteratorは、あなたが匿名関数を終了する(したがってprintln("valuesInPartition: " + valuesInPartition)後)後のみトラバースされます。

+0

ありがとう – Vulcann