2016-11-22 15 views
0

計算速度を落とすために、Pythonでマルチプロセッシングを構築しようとしていますが、マルチプロセッシング後のように全体の計算速度が大幅に低下しました。私は4つの異なるプロセスを作成し、各プロセスへの入力となる4つの異なるデータフレームにdataFrameを分割しました。各プロセスのタイミングをとった後、オーバーヘッドコストが重要であるように見え、これらのオーバーヘッドコストを削減する方法があるかどうか疑問に思っていました。Pythonでマルチプロセッシングの時間を短縮する方法

私はwindows7、python 3.5を使用しています。私のマシンには8つのコアがあります。

def doSomething(args, dataPassed,): 

    processing data, and calculating outputs 

def parallelize_dataframe(df, nestedApply): 
    df_split = np.array_split(df, 4) 
    pool = multiprocessing.Pool(4) 
    df = pool.map(nestedApply, df_split) 
    print ('finished with Simulation') 
    time = float((dt.datetime.now() - startTime).total_seconds()) 

    pool.close() 
    pool.join() 

def nestedApply(df): 

    func2 = partial(doSomething, args=()) 
    res = df.apply(func2, axis=1) 
    res = [output Tables] 
    return res 

if __name__ == '__main__': 

data = pd.read_sql_query(query, conn) 

parallelize_dataframe(data, nestedApply) 
+2

シングルスレッド化がマルチプロセッシングにどれくらいかかっていたかを列挙できますか? – Fruitspunchsamurai

+0

あなたは何個のCPU /コアを持っていますか(ハイパースレッドではなく、実際のもの)? CPU集約的な作業のように見えるので、コアの数よりも多くの数に分割すると、処理が遅くなります。また、データフレームはどれくらい大きく、 'doSomething'はどれくらいの費用がかかりますか?データフレームを各サブプロセスに渡すには、シリアル化する必要があり(ピクルス経由)、デシリアライズする必要があります。フレームが大きく、 'doSomething'が安ければ、ほとんどの時間がオーバーヘッドに費やされます。 –

+0

@ Fruitspunchsamurai シングルスレッドを実行するのに26分かかりましたが、マッピング機能を実行するのに33分、全体的に71分かかりました。 – Hojin

答えて

0

DataFrameをチャンクとして提供する代わりに、キューを使用することをお勧めします。各チャンクをコピーするためには多くのリソースが必要です。そうするにはかなりの時間がかかります。あなたのDataFrameが本当に大きい場合は、メモリが足りなくなる可能性があります。キューを使用すると、パンダの高速イテレータから利益を得ることができます。 ここに私のアプローチがあります。オーバーヘッドは、作業者の複雑さに伴い減少します。残念ながら、私の労働者はそれを実際に示すのは簡単ですが、sleepは少し複雑です。 numProc = 2を使用して

import pandas as pd 
import multiprocessing as mp 
import numpy as np 
import time 


def worker(in_queue, out_queue): 
    for row in iter(in_queue.get, 'STOP'): 
     value = (row[1] * row[2]/row[3]) + row[4] 
     time.sleep(0.1) 
     out_queue.put((row[0], value)) 

if __name__ == "__main__": 
    # fill a DataFrame 
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD')) 

    in_queue = mp.Queue() 
    out_queue = mp.Queue() 

    # setup workers 
    numProc = 2 
    process = [mp.Process(target=worker, 
          args=(in_queue, out_queue)) for x in range(numProc)] 

    # run processes 
    for p in process: 
     p.start() 

    # iterator over rows 
    it = df.itertuples() 

    # fill queue and get data 
    # code fills the queue until a new element is available in the output 
    # fill blocks if no slot is available in the in_queue 
    for i in range(len(df)): 
     while out_queue.empty(): 
      # fill the queue 
      try: 
       row = next(it) 
       in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True) # row = (index, A, B, C, D) tuple 
      except StopIteration: 
       break 
     row_data = out_queue.get() 
     df.loc[row_data[0], "Result"] = row_data[1] 

    # signals for processes stop 
    for p in process: 
     in_queue.put('STOP') 

    # wait for processes to finish 
    for p in process: 
     p.join() 

それはnumProc = 4で、それが倍の速度で、ループごとに50秒かかります。

関連する問題