2017-01-05 9 views
0

私はランダムなバグを与えているスレッドプールを実行しています。ときどき動作しますが、時にはこのコードのpool.join部分にはまってしまいます。私はこの数日間ここにいましたが、それが動作しているかどうか、またはそれが固まったときには何の違いも見つけられません。スレッド結合でスタックになる

ここでコードがあります助けてください... ...

def run_thread_pool(functions_list): 

    # Make the Pool of workers 
    pool = ThreadPool() # left blank to default to machine number of cores 

    pool.map(run_function, functions_list) 

    # close the pool and wait for the work to finish 
    pool.close() 
    pool.join() 
    return 

は同様に、このコードは、また、ランダムに(q.joinで立ち往生されています

def run_queue_block(methods_list, max_num_of_workers=20): 
    from views.console_output_handler import add_to_console_queue 

    ''' 
    Runs methods on threads. Stores method returns in a list. Then outputs that list 
    after all methods in the list have been completed. 

    :param methods_list: example ((method name, args), (method_2, args), (method_3, args) 
    :param max_num_of_workers: The number of threads to use in the block. 
    :return: The full list of returns from each method. 
    ''' 

    method_returns = [] 

    log = StandardLogger(logger_name='run_queue_block') 

    # lock to serialize console output 
    lock = threading.Lock() 

    def _output(item): 
     # Make sure the whole print completes or threads can mix up output in one line. 
     with lock: 
      if item: 
       add_to_console_queue(item) 
      msg = threading.current_thread().name, item 
      log.log_debug(msg) 

     return 

    # The worker thread pulls an item from the queue and processes it 
    def _worker(): 
     log = StandardLogger(logger_name='_worker') 

     while True: 
      try: 
       method, args = q.get() # Extract and unpack callable and arguments 

      except: 
       # we've hit a nonetype object. 
       break 

      if method is None: 
       break 

      item = method(*args) # Call callable with provided args and store result 
      method_returns.append(item) 
      _output(item) 

      q.task_done() 

    num_of_jobs = len(methods_list) 

    if num_of_jobs < max_num_of_workers: 
     max_num_of_workers = num_of_jobs 

    # Create the queue and thread pool. 
    q = Queue() 

    threads = [] 
    # starts worker threads. 
    for i in range(max_num_of_workers): 
     t = threading.Thread(target=_worker) 
     t.daemon = True # thread dies when main thread (only non-daemon thread) exits. 
     t.start() 
     threads.append(t) 

    for method in methods_list: 
     q.put(method) 

    # block until all tasks are done 
    q.join() 

    # stop workers 
    for i in range(max_num_of_workers): 
     q.put(None) 
    for t in threads: 
     t.join() 

    return method_returns 

それが起こったときに、私が知っていることはありません動作するように。それはほとんどの時間を動作しますが、ほとんどの時間が十分ではありません。おそらく、このような不具合が発生する可能性がありますか?

+0

'メソッドに' break'を実行すると、タスクはキューに残っていますか?>キューは決して空にならないので参加しませんか? – snakecharmerb

+0

'concurrent.futures.ThreadPoolExecutor'には' close'と 'join'メソッドはありません。 –

+0

どのようにスレッドを閉じ、すべてのスレッドが完了するのを待ちますか? – Emily

答えて

1

あなたはconcurrent.futures.ThreadPoolExecutorオブジェクトにshutdownを呼び出す必要があります。その後、returnの結果はpool.mapです。

def run_thread_pool(functions_list): 

    # Make the Pool of workers 
    pool = ThreadPool() # left blank to default to machine number of cores 

    result = pool.map(run_function, functions_list) 

    # close the pool and wait for the work to finish 
    pool.shutdown() 
    return result 

私はQueue対象とデーモンThreadせずに、あなたのコードを簡素化しました。あなたの要件に合っているかどうかを確認してください。

def run_queue_block(methods_list): 
    from views.console_output_handler import add_to_console_queue 

    ''' 
    Runs methods on threads. Stores method returns in a list. Then outputs that list 
    after all methods in the list have been completed. 

    :param methods_list: example ((method name, args), (method_2, args), (method_3, args) 
    :param max_num_of_workers: The number of threads to use in the block. 
    :return: The full list of returns from each method. 
    ''' 

    method_returns = [] 

    log = StandardLogger(logger_name='run_queue_block') 

    # lock to serialize console output 
    lock = threading.Lock() 

    def _output(item): 
     # Make sure the whole print completes or threads can mix up output in one line. 
     with lock: 
      if item: 
       add_to_console_queue(item) 
      msg = threading.current_thread().name, item 
      log.log_debug(msg) 

     return 

    # The worker thread pulls an item from the queue and processes it 
    def _worker(method, *args, **kwargs): 
     log = StandardLogger(logger_name='_worker') 


     item = method(*args, **kwargs) # Call callable with provided args and store result 
     with lock: 
      method_returns.append(item) 
     _output(item) 

    threads = [] 
    # starts worker threads. 
    for method, args in methods_list: 
     t = threading.Thread(target=_worker, args=(method, args)) 
     t.start() 
     threads.append(t) 

    # stop workers 
    for t in threads: 
     t.join() 

    return method_returns 
0

2番目の例でキューを結合できるようにするには、すべてのタスクがキューから削除されていることを確認する必要があります。

したがって、_worker関数では、処理できない場合でもタスクを完了としてマークします。そうしないと、キューが空にならず、プログラムがハングします。

関連する問題