2017-11-25 20 views
1

私の最初の質問は、Pythonでの並列性に関するものでした。しかし、質問は答えが残っていたので、私はそれを削除し、私は私の結論を要約しようとします。どちらかライブラリをマルチプロセッシングかをマルチスレッドを使用して - うまくいけば、それはあなたのコードが並列で実行させるには、2つの主要な方法がある。一般にシングルスレッドよりも遅いPythonでのマルチプロセッシング

...誰かを助けます。ライブラリをマルチスレッドstackoverflow.comの多くの記事によると

は、スレッド間で効率的にメモリを共有することができますが、シングルコア上でスレッドを実行します。したがって、主にボトルネックがI/O操作の場合は、コードを高速化できます。ライブラリに多くの実用アプリケーションがあるかどうかわかりません。

コードがCPUに制限されている場合は、マルチ処理ライブラリが問題になる可能性があります。ライブラリは個々のコアにスレッドを分散します。しかし、多くの人(私を含む)は、このようなマルチコアコードは、そのシングルコア対応品を大幅に遅くすることができることを観察しました。この問題は、個々のスレッドがメモリを効果的に共有できないために発生したものと思われます。データは大量にコピーされており、かなりのオーバーヘッドが発生します。下のコードが示すように、オーバーヘッドは入力データ型に大きく依存しています。この問題は、LinuxよりもWindows上で深刻なものになっています。 Pythonは並列性を念頭に置いて設計されていなかったようです。

最初のコードは、Processを使用してコア間にpandas dataframeを割り当てます。

import numpy as np 
import math as mth 
import pandas as pd 
import time as tm 
import multiprocessing as mp 

def bnd_calc_npv_dummy(bnds_info, core_idx, npv): 
    """ multiple core dummy valuation function (based on single core function) """ 

    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 

    npv[core_idx] = np.array(bnds_info['npv']) 

def split_bnds_info(bnds_info, cores_no): 
    """ cut dataframe with bond definitions into pieces - one piece per core """ 

    bnds_info_mp = [] 
    bnds_no = len(bnds_info) 
    batch_size = mth.ceil(np.float64(bnds_no)/cores_no) # number of bonds allocated to one core 

    # split dataframe among cores 
    for idx in range(cores_no): 
     lower_bound = int(idx * batch_size) 
     upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no])) 
     bnds_info_mp.append(bnds_info[lower_bound : upper_bound].reset_index().copy()) 

    # return list of dataframes 
    return bnds_info_mp 

def bnd_calc_npv(bnds_info, cores_no): 
    """ dummy valuation function running multicore """ 

    manager = mp.Manager() 
    npv = manager.dict() 

    bnds_info_mp = split_bnds_info(bnds_info, cores_no) 

    processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]  
    [process.start() for process in processes]  
    [process.join() for process in processes] 

    # return NPV of individual bonds  
    return np.hstack(npv.values()) 

if __name__ == '__main__': 

    # create dummy dataframe 
    bnds_no = 1200 # number of dummy in the sample 
    bnds_info = {'currency_name' : 'EUR', 'npv' : 100} 
    bnds_info = pd.DataFrame(bnds_info, index = range(1)) 
    bnds_info = pd.concat([bnds_info] * bnds_no, ignore_index = True) 

    # one core 
    print("ONE CORE") 
    start_time = tm.time() 
    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 
    npv = np.array(bnds_info['npv']) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # two cores 
    print("TWO CORES") 
    cores_no = 2 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # three cores 
    print("THREE CORES") 
    cores_no = 3 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # four cores 
    print("FOUR CORES") 
    cores_no = 4 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

第二のコードは、前のものと同じである - 唯一の違いはの実行時間変化を有する単一コアの実行時間の変化を比較する(この時間は、我々はnumpy array代わりpandas dataframeの使用および性能差が大きいということですマルチコア)。

import numpy as np 
import math as mth 
import time as tm 
import multiprocessing as mp 

def bnd_calc_npv_dummy(bnds_info, core_idx, npv): 
    """ multiple core dummy valuation function (based on single core function) """ 

    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 

    npv[core_idx] = bnds_info 

def split_bnds_info(bnds_info, cores_no): 
    """ cut dataframe with bond definitions into pieces - one piece per core """ 

    bnds_info_mp = [] 
    bnds_no = len(bnds_info) 
    batch_size = mth.ceil(np.float64(bnds_no)/cores_no) # number of bonds allocated to one core 

    # split dataframe among cores 
    for idx in range(cores_no): 
     lower_bound = int(idx * batch_size) 
     upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no])) 
     bnds_info_mp.append(bnds_info[lower_bound : upper_bound]) 

    # return list of dataframes 
    return bnds_info_mp 

def bnd_calc_npv(bnds_info, cores_no): 
    """ dummy valuation function running multicore """ 

    manager = mp.Manager() 
    npv = manager.dict() 

    bnds_info_mp = split_bnds_info(bnds_info, cores_no) 

    processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]  
    [process.start() for process in processes]  
    [process.join() for process in processes] 

    # return NPV of individual bonds  
    return np.hstack(npv.values()) 

if __name__ == '__main__': 

    # create dummy dataframe 
    bnds_no = 1200 # number of dummy in the sample 
    bnds_info = np.array([100] * bnds_no) 

    # one core 
    print("ONE CORE") 
    start_time = tm.time() 
    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # two cores 
    print("TWO CORES") 
    cores_no = 2 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # three cores 
    print("THREE CORES") 
    cores_no = 3 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # four cores 
    print("FOUR CORES") 
    cores_no = 4 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

コードの最後の部分は、Pool代わりにProcessを使用しています。ランタイムはやや良いです。

import numpy as np 
import time as tm 
import multiprocessing as mp 

#import pdb 
#pdb.set_trace() 

def bnd_calc_npv_dummy(bnds_info): 
    """ multiple core dummy valuation function (based on single core function) """ 

    try: 
     # get number of bonds 
     bnds_no = len(bnds_info) 
    except: 
     pass 
     bnds_no = 1 

     tm.sleep(0.0001 * bnds_no) 

    return bnds_info 

def bnd_calc_npv(bnds_info, cores_no): 
    """ dummy valuation function running multicore """ 

    pool = mp.Pool(processes = cores_no) 
    npv = pool.map(bnd_calc_npv_dummy, bnds_info.tolist()) 

    # return NPV of individual bonds  
    return npv 

if __name__ == '__main__': 

    # create dummy dataframe 
    bnds_no = 1200 # number of dummy in the sample 
    bnds_info = np.array([100.0] * bnds_no) 

    # one core 
    print("ONE CORE") 
    start_time = tm.time() 
    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # two cores 
    print("TWO CORES") 
    cores_no = 2 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # three cores 
    print("THREE CORES") 
    cores_no = 3 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # four cores 
    print("FOUR CORES") 
    cores_no = 4 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

だから、私の結論は、(私はPythonの2.7.13およびWindow 7を使用)並列処理のPython実装は、実際の生活の中では適用されないということです。 敬具、

マッキー

PS:誰かが問題の部分がindependantly計算することができたときに

+4

これをまったく読まないと、間違ったマルチ処理は単一のプロセスより遅くなります。[最小限の、完全で検証可能な例](https://stackoverflow.com/help/mcve)の作成を検討してください。 –

+1

I/Oバインドされたタスク(ファイルの読み込みや書き込みなど)では、 'multiprocessing'モジュールの代わりに' threading'モジュールを考慮する必要があります。マルチプロセッシングは、CPUに束縛されたタスクの方が効果的です。 – ettanany

答えて

1

マルチプロセッシングは、最高の作品...私はもっと楽しくよりも私の心を変更するコードを変更することができる場合、例えばmultiprocessing.Poolがあります。 プール内のすべてのワーカー・プロセスは、入力の一部を処理し、結果をマスター・プロセスに戻します。

データをすべて入力配列に変更する必要がある場合、managerの同期化オーバーヘッドはマルチプロセッシングの利益を破壊する可能性があります。

関連する問題