2017-05-18 2 views
1

.collect()を呼び出さずに、RDDの各要素に対してドライバでアクションを実行します。最初のアイデアはRDD.toLocalIterator()を使用することです:RDD.toLocalIterator eager評価

val config = new SparkConf().setMaster("local[10]").setAppName("xxx") 
val sc: SparkContext = new SparkContext(config) 
val ints: RDD[Int] = sc.parallelize(1 to 50) 
val doubled = ints.map(i => { 
    Thread.sleep(200) 
    println(s"map $i" + Thread.currentThread()) 
    i * 2 
}) 

doubled.toLocalIterator.foreach(i => { 
    println(s"got $i" + Thread.currentThread()) 
}) 

しかし、この場合には次のパーティションの計算は、前のパーティションを消費した後に開始されます。したがって、全体の計算には時間がかかりすぎます。 私は、次のハックを発明:

doubled.cache() 
//force rdd to be materialized 
println(doubled.count()) 
//traverse cached rdd 
doubled.toLocalIterator.foreach(i => { 
    println(s"got $i" + Thread.currentThread()) 
}) 

は、任意のより良い解決策はありますか?

答えて

-1

なぜあなたはRDD.foreachメソッドを使用しないのですか?私はこれがあなたの例題と同じように実行できると思います。これを使用して、並列処理の利益を得ることができます。RDD

+0

'foreach'はワーカーで実行されます。ドライバで関数を実行する必要があります。 – simpadjo

+0

ドライバで関数を実行すると、 sparkが提供する並列処理 –