2017-05-26 6 views
0

以下のテストコードを実行して、LinuxでPoolとProcessを使用するパフォーマンスを確認しました。私はPython 2.7を使用しています。マルチプロセッシングのソースコードは、マルチプロセッシングを使用していることを示しています。プロセス。しかし、マルチプロセッシング.Poolは、マルチプロセッシングの同じ数よりも多くの時間とメモリを要します。プロセス、そして私はこれを取得しません。なぜマルチプロセッシング.Poolとマルチプロセッシング.ProcessがLinuxで異なって動作するのですか?

  1. は、大規模な辞書を作成し、サブプロセス:ここ

    は私がやったことです。

  2. 各サブプロセスにdictを読み取り専用で渡します。

  3. 各サブプロセスは計算を行い、小さな結果を返します。以下は

私のテストコードです:ここで

from multiprocessing import Pool, Process, Queue 
import time, psutil, os, gc 

gct = time.time 
costTime = lambda ET: time.strftime('%H:%M:%S', time.gmtime(int(ET))) 

def getMemConsumption(): 
    procId = os.getpid() 
    proc = psutil.Process(procId) 
    mem = proc.memory_info().rss 
    return "process ID %d.\nMemory usage: %.6f GB" % (procId, mem*1.0/1024**3) 

def f_pool(l, n, jobID): 
    try: 
     result = {} 
     # example of subprocess work 
     for i in xrange(n): 
      result[i] = l[i] 
     # work done 
     # gc.collect() 
     print getMemConsumption() 
     return 1, result, jobID 
    except: 
     return 0, {}, jobID 

def f_proc(q, l, n, jobID): 
    try: 
     result = {} 
     # example of subprocess work 
     for i in xrange(n): 
      result[i] = l[i] 
     # work done 
     print getMemConsumption() 
     q.put([1, result, jobID]) 
    except: 
     q.put([0, {}, jobID]) 

def initialSubProc(targetFunc, procArgs, jobID): 
    outQueue = Queue() 
    args = [outQueue] 
    args.extend(procArgs) 
    args.append(jobID) 
    p = Process(target = targetFunc, args = tuple(args)) 
    p.start() 
    return p, outQueue 


def track_add_Proc(procList, outQueueList, maxProcN, jobCount, 
        maxJobs, targetFunc, procArgs, joinFlag, all_result): 
    if len(procList) < maxProcN: 
     p, q = initialSubProc(targetFunc, procArgs, jobCount) 
     outQueueList.append(q) 
     procList.append(p) 
     jobCount += 1 
     joinFlag.append(0) 
    else: 
     for i in xrange(len(procList)): 
      if not procList[i].is_alive() and joinFlag[i] == 0: 
       procList[i].join() 
       all_results.append(outQueueList[i].get()) 
       joinFlag[i] = 1 # in case of duplicating result of joined subprocess 
       if jobCount < maxJobs: 
        p, q = initialSubProc(targetFunc, procArgs, jobCount) 
        procList[i] = p 
        outQueueList[i] = q 
        jobCount += 1 
        joinFlag[i] = 0 
    return jobCount 

if __name__ == '__main__': 
    st = gct() 
    d = {i:i**2 for i in xrange(10000000)} 
    print "MainProcess create data dict\n%s" % getMemConsumption() 
    print 'Time to create dict: %s\n\n' % costTime(gct()-st) 

    nproc = 2 
    jobs = 8 
    subProcReturnDictLen = 1000 
    procArgs = [d, subProcReturnDictLen] 

    print "Use multiprocessing.Pool, max subprocess = %d, jobs = %d\n" % (nproc, jobs) 
    st = gct() 
    pool = Pool(processes = nproc) 
    for i in xrange(jobs): 
     procArgs.append(i) 
     sp = pool.apply_async(f_pool, tuple(procArgs)) 
     procArgs.pop(2) 
     res = sp.get() 
     if res[0] == 1: 
      # do something with the result 
      pass 
     else: 
      # do something with subprocess exception handle 
      pass 
    pool.close() 
    pool.join() 
    print "Total time used to finish all jobs: %s" % costTime(gct()-st) 
    print "Main Process\n", getMemConsumption(), '\n' 

    print "Use multiprocessing.Process, max subprocess = %d, jobs = %d\n" % (nproc, jobs) 
    st = gct() 
    procList = [] 
    outQueueList = [] 
    all_results = [] 
    jobCount = 0 
    joinFlag = [] 
    while (jobCount < jobs): 
     jobCount = track_add_Proc(procList, outQueueList, nproc, jobCount, 
            jobs, f_proc, procArgs, joinFlag, all_results) 
    for i in xrange(nproc): 
     if joinFlag[i] == 0: 
      procList[i].join() 
      all_results.append(outQueueList[i].get()) 
      joinFlag[i] = 1 
    for i in xrange(jobs): 
     res = all_results[i] 
     if res[0] == 1: 
      # do something with the result 
      pass 
     else: 
      # do something with subprocess exception handle 
      pass 
    print "Total time used to finish all jobs: %s" % costTime(gct()-st) 
    print "Main Process\n", getMemConsumption() 

は結果である:

MainProcess create data dict 
process ID 21256. 
Memory usage: 0.841743 GB 
Time to create dict: 00:00:02 


Use multiprocessing.Pool, max subprocess = 2, jobs = 8 

process ID 21266. 
Memory usage: 1.673084 GB 
process ID 21267. 
Memory usage: 1.673088 GB 
process ID 21266. 
Memory usage: 2.131172 GB 
process ID 21267. 
Memory usage: 2.131172 GB 
process ID 21266. 
Memory usage: 2.176079 GB 
process ID 21267. 
Memory usage: 2.176083 GB 
process ID 21266. 
Memory usage: 2.176079 GB 
process ID 21267. 
Memory usage: 2.176083 GB 

Total time used to finish all jobs: 00:00:49 
Main Process 
process ID 21256. 
Memory usage: 0.843079 GB 


Use multiprocessing.Process, max subprocess = 2, jobs = 8 

process ID 23405. 
Memory usage: 0.840614 GB 
process ID 23408. 
Memory usage: 0.840618 GB 
process ID 23410. 
Memory usage: 0.840706 GB 
process ID 23412. 
Memory usage: 0.840805 GB 
process ID 23415. 
Memory usage: 0.840900 GB 
process ID 23417. 
Memory usage: 0.840973 GB 
process ID 23419. 
Memory usage: 0.841061 GB 
process ID 23421. 
Memory usage: 0.841152 GB 

Total time used to finish all jobs: 00:00:00 
Main Process 
process ID 21256. 
Memory usage: 0.843781 GB 

multiprocessing.Poolからサブプロセスがで1.6ギガバイト程度必要とする理由私にはわかりませんマルチプロセスからのサブプロセスです。プロセスは、メインプロセスのメモリコストに等しい0.84 GBしか必要としません。必要なすべてのジョブの時間が1秒未満であるため、マルチプロセッシングだけがLinuxの「コピーオンライト」の利点を享受しているようです。私はなぜマルチプロセッシングを知りません.Poolはこれを楽しめません。ソースコードから、multiprocessing.Poolはマルチプロセッシングのラッパーのようです。プロセス。

答えて

0

質問:multiprocessing.Poolからサブプロセスが
、初めに1.6ギガバイト程度必要とする理由私にはわからない...プールはmultiprocessing.Process

これのラッパーのように思えますは、すべてのジョブの結果としてPoolが予約されています。
第二に、PoolSimpleQueue()Threadsを使用しています。
第3に、Poolはすべてargvデータを渡してからprocessに渡します。

あなたprocessの例では、彼らがそうであるようにargvを渡し、すべてのためだけ Queue()を使用しています。

Poolは、単なるラッパーにすぎません。

+0

ありがとうございます。 2つのキューを使用して結果と入力の両方を逆転させることは今や理にかなっています。小さな修正で、 'Pool'は' multiprocessing.Manager()。Queue'の代わりに 'multiprocessing.Queue'を使います。 – Finix

関連する問題