2017-10-14 5 views
1

入力を受け取り処理するいくつかのプロセスを設定したいので、この結果の結果を処理したい別のタスクにします。本質的に、各タスクは(同じタイプの)ゼロまたは複数の新しいタスクをもたらし、最終的にすべてのタスクは新しいタスクを生成しない。すべてのプロセスがキューから取得しようとしていてキューが空の場合の処理​​を終了しますか?

私はキューがこれに適していると思ったので、入力キューと結果キューを追加して、何も新しい結果にならないタスクを追加しました。いつでもキューは空でもかまいませんが、別のプロセスがタスクを処理している場合は、キューを追加できます。

したがって、私はすべてのプロセスが同時に入力キューから取得しようとしているときに終了します。

私は、一般に、Pythonのマルチプロセッシングとマルチプロセッシングの両方に全く新しいものです。

class Consumer(Process): 
    def __init__(self, name): 
     super().__init__(name=name) 

    def run(): 
     # This is where I would have the task try to get a new task off of the 
     # queue and then calculate the results and put them into the queue 
     # After which it would then try to get a new task and repeat 

     # If this an all other processes are trying to get and the queue is 
     # empty That is the only time I know that everything is complete and can 
     # continue 
     pass 

def start_processing(): 
    in_queue = Queue() 
    results_queue = Queue() 
    consumers = [Consumer(str(i)) for i in range(cpu_count())] 

    for i in consumers: 
     i.start() 

    # Wait for the above mentioned conditions to be true before continuing 
+0

この時点で、これは私が前方に移動する前に作業する必要があるもののように思えるので、私は唯一の絶対的なスケルトンコードを持っています。基本的に私はプロセスとキューを作成しました。 –

+0

概要コードをいくつか追加しました –

答えて

1

JoinableQueueは、この目的に合うように設計されています。私が何を意味するかの基本的な概要を追加するために編集

JoinableQueueに参加すると、進行中のタスクがあるまでブロックされます。

次のように使用できます。メインプロセスでは、特定の量のワーカープロセスが割り当てられ、JoinableQueueが割り当てられます。ワーカープロセスは、キューを使用して新しいタスクを生成し、消費します。メインプロセスは、それ以上タスクが進行しなくなるまで待ち行列に参加することで待機します。その後、ワーカープロセスを終了して終了します。

非常に単純化した例(擬似コード):

def consumer(queue): 
    for task in queue.get(): 
     results = process_task(task) 

     if 'more_tasks' in results: 
      for new_task in results['more_tasks']: 
       queue.put(new_task) 

     # signal the queue that a task has been completed 
     queue.task_done() 

def main(): 
    queue = JoinableQueue() 

    processes = start_processes(consumer, queue) 

    for task in initial_tasks: 
     queue.put(task) 

    queue.join() # block until all work is done 

    terminate_processes(processes) 
関連する問題