2017-03-02 8 views
4

私はPythonのthreadingモジュールを学んでいますし、私自身がQueue.join()はなぜここに必要ですか?

from Queue import Queue 
import threading 

lock = threading.Lock() 
MAX_THREADS = 8 
q = Queue() 
count = 0 

# some i/o process 
def io_process(x): 
    pass 

# process that deals with shared resources 
def shared_resource_process(x): 
    pass 

def func(): 
    global q, count 
    while not q.empty(): 
     x = q.get() 
     io_process(x) 
     if lock.acquire(): 
      shared_resource_process(x) 
      print '%s is processing %r' %(threading.currentThread().getName(), x) 
      count += 1   
      lock.release() 

def main(): 
    global q 
    for i in range(40): 
     q.put(i) 

    threads = [] 
    for i in range(MAX_THREADS): 
     threads.append(threading.Thread(target=func)) 

    for t in threads: 
     t.start() 

    for t in threads: 
     t.join() 

    print 'multi-thread done.' 
    print count == 40 

if __name__ == '__main__': 
    main() 

を理解するために、次のコードを書いていると、出力は次のように捕まってしまった:メインでプリントは()ではないことを

Thread-1 is processing 32 
Thread-8 is processing 33 
Thread-6 is processing 34 
Thread-2 is processing 35 
Thread-5 is processing 36 
Thread-3 is processing 37 
Thread-7 is processing 38 
Thread-4 is processing 39 

注意いくつかのスレッドがハングしている/ブロックしていることを意味する?私が読ん

Thread-6 is processing 36 
Thread-4 is processing 37 
Thread-3 is processing 38 
Thread-7 is processing 39 
multi-thread done. 
True 

Process finished with exit code 0 

それから私は)(q.task_doneを追加することにより、FUNC()メソッドを変更します。

if lock.acquire(): 
      shared_resource_process(x) 
      print '%s is processing %r' %(threading.currentThread().getName(), x) 
      count += 1 
      q.task_done() # why is this necessary ? 
      lock.release() 

を、今すべてのスレッドは私が期待通りに終了し、右の出力を得ますQueue.Queue hereのドキュメントで、queueのすべてのアイテムが処理されていることを確認するために、task_done()がqueue.join()で動作することを確認してください。 main()の中でqueue.join()を呼び出さなかったので、なぜfunc()にtask_done()が必要なのですか? task_done()コードが見つからないときにスレッドがハングしたりブロックされたりする原因は何ですか?

答えて

3

コードに競合状態があります。

  1. スレッドAは、それが空だかいないかどうかを確認するためにq.emptyを呼び出します。あなたはQueueに残された唯一つの項目があり、イベントのシーケンス以下のその後の2つだけのスレッドの代わりに、8を使用しているだろうが起こることを想像してみてキューには1つの項目があるので、結果はFalseであり、ループ本体が実行されます。
  2. スレッドAがq.getを呼び出す前に、コンテキストスイッチがあり、スレッドBが実行されます。
  3. スレッドBはq.emptyを呼び出しますが、キューにはまだ1つの項目があり、結果はFalseであり、ループ本体が実行されます。
  4. スレッドBはパラメータなしでq.getを呼び出し、直ちに最後の項目をキューから返します。その後、スレッドBがアイテムを処理し、q.emptyTrueを返すので、終了します。
  5. スレッドAが実行されます。ステップ1ですでにq.emptyと呼ばれているので、次にq.getと呼ぶが、これは永遠にブロックされ、プログラムは終了しない。

あなたはtimeをインポートし、ループを少し変更することにより、上記の動作をシミュレートすることができますtask_doneが呼び出されているかどうか

while not q.empty(): 
    time.sleep(0.1) # Force context switch 
    x = q.get() 

注意はその動作に関係なく同じではありません。

なぜtask_doneを追加したのですか?デフォルトでは、Python 2は100個のインタプリタの指示ごとにコンテキストスイッチを行います。そのため、コードを追加するとコンテキストスイッチが発生する場所が変更されている可能性があります。説明のため、another questionlinked PDFを参照してください。私のマシンでは、task_doneがあったかどうかにかかわらず、プログラムはハングしませんでした。これは、あなたのために何が起こったのかという推測です。

動作を修正したい場合は、無限ループとパラメータをブロックgetに渡すだけで、ブロックしないように指示できます。これは最終的にあなたがキャッチして、ループを破ることができるQueue.Empty例外をスローするgetが発生します。あなたの答えのための

from Queue import Queue, Empty 

def func(): 
    global q, count 
    while True: 
     try: 
      x = q.get(False) 
     except Empty: 
      break 
     io_process(x) 
     if lock.acquire(): 
      shared_resource_process(x) 
      print '%s is processing %r' %(threading.currentThread().getName(), x) 
      count += 1 
      lock.release() 
+0

おかげで、それは明らかだと本当に役立ちます – vansdev

関連する問題