2017-06-28 5 views
0

問題:マシン上で大量のデータ処理を開始するためのDAG(Directed-Acyclic-Graph)構造があります。プロセスのいくつかは、親データ処理が完了したときにのみ開始できます。これは、複数レベルの処理があるためです。私はPythonマルチプロセッシングライブラリを使用して、最初の目標として1つのマシン上ですべてを処理し、後でマネージャを使用して異なるマシンで実行するようにスケールします。私は、Pythonのマルチプロセッシングに関する以前の経験はありません。誰もがそれが良いライブラリであることを提案することはできますか?はいの場合、いくつかの基本的な実装のアイデアはうまくいくでしょう。もしそうでなければ、Pythonでこれを行うために他に何が使えますか?Pythonマルチプロセッシングを使用して非同期で多数の依存プロセスを開始する

例:

A - > B

B - > D、E、F、G

C - > D

上記の例ではiが&をキックしますCの最初の(並列)、実行が成功した後、残りのプロセスはBが最初に終了するのを待つだけです。 Bの実行が終了すると、他のすべてのプロセスが開始されます。

P .:私は例を使って明らかにしようとしましたが、私は実際のデータを機密扱いで共有することはできません。

答えて

1

私はこのようなことにプロセスとキューを使用することの大きなファンです。そのよう

良い例だ
from multiprocessing import Process, Queue 
from Queue import Empty as QueueEmpty 
import time 

#example process functions 
def processA(queueA, queueB): 
    while True: 
     try: 
      data = queueA.get_nowait() 
      if data == 'END': 
       break 
     except QueueEmpty: 
      time.sleep(2) #wait some time for data to enter queue 
      continue 
     #do stuff with data 
     queueB.put(data) 

def processA(queueB, _): 
    while True: 
     try: 
      data = queueB.get_nowait() 
      if data == 'END': 
       break 
     except QueueEmpty: 
      time.sleep(2) #wait some time for data to enter queue 
      continue 
     #do stuff with data 

#helper functions for starting and stopping processes 
def start_procs(num_workers, target_function, args): 
    procs = [] 
    for _ in range(num_workers): 
     p = Process(target=target_function, args=args) 
     p.start() 
     procs.append(p) 
    return procs 

def shutdown_process(proc_lst, queue): 
    for _ in proc_lst: 
     queue.put('END') 
    for p in proc_lst: 
     try: 
      p.join() 
     except KeyboardInterrupt: 
      break 

queueA = Queue(<size of queue> * 3) #needs to be a bit bigger than actual. 3x works well for me 
queueB = Queue(<size of queue>) 
queueC = Queue(<size of queue>) 
queueD = Queue(<size of queue>) 

procsA = start_procs(number_of_workers, processA, (queueA, queueB)) 
procsB = start_procs(number_of_workers, processB, (queueB, None)) 

# feed some data to processA 
[queueA.put(data) for data in start_data] 

#shutdown processes 
shutdown_process(procsA, queueA) 
shutdown_process(procsB, queueB) 

#etc, etc. You could arrange the start, stop, and data feed statements to arrive at the dag behaviour you desire 
+0

。しかし、データが持つことができる依存関係の量を考えると、キューを持つのは良い考えですか?私が考えることは、労働者のプールはどんな提案よりも良いはずですか? –

+0

あなたが並行性を気にしない限り、... – Sebastian

+0

DAGの場合、1つのプロセスは複数の親プロセスを持つことができます。それは、私がプロセスごとに別々のキューを維持する妥当性を疑う。正しく動作しないことがあります。時々、プログラムはすべての親が1つだけでなく終えるのを待たなければならないでしょう。 –

関連する問題