1

私は、pysparkを使用して1つのRDD上で完全に独立した変換を2つの関数が同時に実行しようとしています。同じことをするいくつかの方法は何ですか?pysparkを使用して1つのRDD上で完全に独立した変換を2つ実行する方法

def doXTransforms(sampleRDD): 
    (X transforms) 

def doYTransforms(sampleRDD): 
    (Y Transforms) 

if __name__ == "__main__": 
    sc = SparkContext(appName="parallelTransforms") 
    sqlContext = SQLContext(sc) 
    hive_context = HiveContext(sc) 

    rows_rdd = hive_context.sql("select * from tables.X_table") 

    p1 = Process(target=doXTransforms , args=(rows_rdd,)) 
    p1.start() 
    p2 = Process(target=doYTransforms, args=(rows_rdd,)) 
    p2.start() 
    p1.join() 
    p2.join() 
    sc.stop() 

これは機能しませんが、これはうまくいきません。 しかし、これを行うには他の方法がありますか?特に、python-spark固有のソリューションはありますか?

+0

通常、それぞれの変換が(ほぼ)100%のクラスタリソースを使用する可能性がある場合、パラレルで実行すると実際には速度が遅くなります。 – ShuaiYuan

答えて

1

スレッドを使用して、クラスタに両方のタスクを同時に処理するのに十分なリソースがあることを確認してください。

from threading import Thread 
import time 

def process(rdd, f): 
    def delay(x): 
     time.sleep(1) 
     return f(x) 
    return rdd.map(delay).sum() 


rdd = sc.parallelize(range(100), int(sc.defaultParallelism/2)) 

t1 = Thread(target=process, args=(rdd, lambda x: x * 2)) 
t2 = Thread(target=process, args=(rdd, lambda x: x + 1)) 
t1.start(); t2.start() 

おそらく、これは実際には便利なことはしばしばありますが、そうでなければうまくいくはずです。

in-application schedulingFAIRスケジューラプールとスケジューラプールで使用すると、実行戦略をより適切に制御できます。

+0

GILのため、スレッドはPythonでは本当に並行ではありません。したがって、上記の方法を使用すると、複数のコアを使用することはできません。 –

+0

それは問題ではありません。このコードで発生する唯一のことはRPC呼び出しです。実際の計算には触れません。非同期呼び出しを使用して単一のスレッドでこれを処理できます。また、[この回答](http://stackoverflow.com/a/38038346/1560062)とその下の私のコメントを参照してください。 – zero323

+0

これは、ありがとう! /etc/hadoop/conf/capacity-scheduler.xmlの yarn.scheduler.capacity.maximum-am-resource-percentを0.1から0.5に変更するだけでした。 –