私は奇妙な何かを満たしスパークmapPatartitionsを使用する場合、mutable.HashSetが作成されたが正しくマッププロセスに充填することができない、ここでのコードは次のとおりです。の実行順序
object Test {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(conf)
val input = List[String]("1", "2", "3", "3", "4", "5", "5")
val result = sc.parallelize(input)
.mapPartitions((pi: Iterator[String]) => {
val valuesInPartition = new mutable.HashSet[String]()
val values = pi.map(line => {
valuesInPartition.add(line)
println("processing line: " + line + ", valuesInPartition: " + valuesInPartition)
})
println("valuesInPartition: " + valuesInPartition)
values
})
result.collect
}
}
と出力:
valuesInPartition: Set()
processing line: 1, valuesInPartition: Set(1)
processing line: 2, valuesInPartition: Set(1, 2)
processing line: 3, valuesInPartition: Set(3, 1, 2)
processing line: 3, valuesInPartition: Set(3, 1, 2)
processing line: 4, valuesInPartition: Set(3, 4, 1, 2)
processing line: 5, valuesInPartition: Set(3, 4, 1, 5, 2)
processing line: 5, valuesInPartition: Set(3, 4, 1, 5, 2)
私が理解しているように、mapPartition内のコードはシーケンシャルに実行する必要があります。マップ機能が終了したら、最初の行を最後に印刷することになっています。しかし、ここでは、Setは値を付けずに印刷されます。
私はここで何か間違ったことを理解していると思います。
ありがとう – Vulcann