2017-09-12 21 views
2

私は以下のようなPythonプログラムを持っています。Pythonマルチプロセッシングキューを使ったデッドロック

from multiprocessing import Lock, Process, Queue, current_process 
import time 

lock = Lock() 


def do_job(tasks_to_accomplish, tasks_that_are_done): 
    while not tasks_to_accomplish.empty(): 
     task = tasks_to_accomplish.get() 
     print(task) 
     lock.acquire() 
     tasks_that_are_done.put(task + ' is done by ' + current_process().name) 
     lock.release() 
     time.sleep(1) 
    return True 


def main(): 
    number_of_task = 10 
    number_of_processes = 4 
    tasks_to_accomplish = Queue() 
    tasks_that_are_done = Queue() 
    processes = [] 

    for i in range(number_of_task): 
     tasks_to_accomplish.put("Task no " + str(i)) 

    # creating processes 
    for w in range(number_of_processes): 
     p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done)) 
     processes.append(p) 
     p.start() 


    # completing process 
    for p in processes: 
     p.join() 

    # print the output 
    while not tasks_that_are_done.empty(): 
     print(tasks_that_are_done.get()) 

    return True 


if __name__ == '__main__': 
    main() 

時にはプログラムが完全に実行されることがありますが、ときどき停止して完了しないことがあります。手動で終了すると、次のエラーが発生します。

$ python3 multiprocessing_example.py 
Task no 0 
Task no 1 
Task no 2 
Task no 3 
Task no 4 
Task no 5 
Task no 6 
Task no 7 
Task no 8 
Task no 9 
^CProcess Process-1: 
Traceback (most recent call last): 
    File "multiprocessing_example.py", line 47, in <module> 
    main() 
    File "multiprocessing_example.py", line 37, in main 
    p.join() 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 121, in join 
    res = self._popen.wait(timeout) 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 51, in wait 
    return self.poll(os.WNOHANG if timeout == 0.0 else 0) 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 29, in poll 
    pid, sts = os.waitpid(self.pid, flag) 
KeyboardInterrupt 
Traceback (most recent call last): 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap 
    self.run() 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run 
    self._target(*self._args, **self._kwargs) 
    File "multiprocessing_example.py", line 9, in do_job 
    task = tasks_to_accomplish.get() 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/queues.py", line 94, in get 
    res = self._recv_bytes() 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes 
    buf = self._recv_bytes(maxlength) 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes 
    buf = self._recv(4) 
    File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 379, in _recv 
    chunk = read(handle, remaining) 
KeyboardInterrupt 

誰かがプログラムの問題点を教えていただけますか?私はPython 3.6を使用しています。

答えて

1

LockQueueの周りに必要とされていません。

lock.acquire() 
    tasks_that_are_done.put(task + ' is done by ' + current_process().name) 
    lock.release() 

Queue
このモジュールのキュークラスは、必要なすべてのロックセマンティクスを実装します。


質問:...プログラムの問題は何ですか? empty()国家が到達したget() まで変更しないという保証はありませんので

あなたはjoin()を呼び出すにデッドロックQueue.empty()Queue.get()、 なリードを使用しています。

がちなデッドロック:

while not tasks_to_accomplish.empty(): 
    task = tasks_to_accomplish.get() 

代わりのempty/getを使用しては、ペアは、例えば使用:更新の

import queue 
while True: 
    try: 
     task = tasks_to_accomplish.get_nowait() 
    except queue.Empty: 
     break 
    else: 
     # Handle task here 
     ... 
     tasks_to_accomplish.task_done() 
+0

おかげで、あなたは私の問題を修正しました。 – Pankaj

関連する問題