2017-12-31 99 views
0

multprocess.Pool.apply_asyncで動作する単一のファイルにログを取得できません。 Logging Cookbookからthisの例を適用しようとしていますが、multiprocessing.Processの場合にのみ機能します。ログキューをapply_asyncに渡すことは効果がないようです。 プールを使用して、同時スレッド数を簡単に管理できるようにしたいと考えています。マルチプロセスで単一ファイルにログする方法.Pool.apply_async

次のマルチプロセッシングプロセスの例は、メインプロセスからログメッセージを受け取っていないことを除いて、私にとってはうまくいきます。100個の大きなジョブがある場合はうまく動作しないと思います。 multiprocessing.Manager、multiprocessing.Queue、multiprocessing.get_logger、apply_async.getを使用して、私は多くのわずかな変化を試みた

def main_with_pool(): 
    start_time = time.time() 
    queue = multiprocessing.Queue(-1) 
    listener = multiprocessing.Process(target=listener_process, 
             args=(queue, listener_configurer)) 
    listener.start() 
    pool = multiprocessing.Pool(processes=3) 
    job_list = [np.random.randint(10)/2 for i in range(10)] 
    single_thread_time = np.sum(job_list) 
    for i, sleep_time in enumerate(job_list): 
     name = str(i) 
     pool.apply_async(worker_function, 
         args=(sleep_time, name, queue, worker_configurer)) 

    queue.put_nowait(None) 
    listener.join() 
    end_time = time.time() 
    print("Script execution time was {}s, but single-thread time was {}s".format(
     (end_time - start_time), 
     single_thread_time 
    )) 

if __name__ == "__main__": 
    main_with_pool() 

import logging 
import logging.handlers 
import numpy as np 
import time 
import multiprocessing 
import pandas as pd 
log_file = 'PATH_TO_FILE/log_file.log' 

def listener_configurer(): 
    root = logging.getLogger() 
    h = logging.FileHandler(log_file) 
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') 
    h.setFormatter(f) 
    root.addHandler(h) 

# This is the listener process top-level loop: wait for logging events 
# (LogRecords)on the queue and handle them, quit when you get a None for a 
# LogRecord. 
def listener_process(queue, configurer): 
    configurer() 
    while True: 
     try: 
      record = queue.get() 
      if record is None: # We send this as a sentinel to tell the listener to quit. 
       break 
      logger = logging.getLogger(record.name) 
      logger.handle(record) # No level or filter logic applied - just do it! 
     except Exception: 
      import sys, traceback 
      print('Whoops! Problem:', file=sys.stderr) 
      traceback.print_exc(file=sys.stderr) 


def worker_configurer(queue): 
    h = logging.handlers.QueueHandler(queue) # Just the one handler needed 
    root = logging.getLogger() 
    root.addHandler(h) 
    # send all messages, for demo; no other level or filter logic applied. 
    root.setLevel(logging.DEBUG) 


# This is the worker process top-level loop, which just logs ten events with 
# random intervening delays before terminating. 
# The print messages are just so you know it's doing something! 
def worker_function(sleep_time, name, queue, configurer): 
    configurer(queue) 
    start_message = 'Worker {} started and will now sleep for {}s'.format(name, sleep_time) 
    logging.info(start_message) 
    time.sleep(sleep_time) 
    success_message = 'Worker {} has finished sleeping for {}s'.format(name, sleep_time) 
    logging.info(success_message) 

def main_with_process(): 
    start_time = time.time() 
    single_thread_time = 0. 
    queue = multiprocessing.Queue(-1) 
    listener = multiprocessing.Process(target=listener_process, 
             args=(queue, listener_configurer)) 
    listener.start() 
    workers = [] 
    for i in range(10): 
     name = str(i) 
     sleep_time = np.random.randint(10)/2 
     single_thread_time += sleep_time 
     worker = multiprocessing.Process(target=worker_function, 
             args=(sleep_time, name, queue, worker_configurer)) 
     workers.append(worker) 
     worker.start() 
    for w in workers: 
     w.join() 
    queue.put_nowait(None) 
    listener.join() 
    end_time = time.time() 
    final_message = "Script execution time was {}s, but single-thread time was {}s".format(
     (end_time - start_time), 
     single_thread_time 
    ) 
    print(final_message) 

if __name__ == "__main__": 
    main_with_process() 

しかし、私は、次の適応が仕事を得ることができません()、しかし、何も動かされていない。

私はこのための既製のソリューションがあると思います。代わりにセロリを試してみるべきですか?

ありがとう

+0

この問題は、torekのアドバイスに従って解決しました。私はgithub上で動作する[例](https://github.com/ClayCampaigne/multiprocessing-pool-logging/blob/master/pool_logging.py)を持っています。 – GrayOnGray

答えて

1

あり絡み合っているここでは2つの別々の問題、実際には、次のとおりです。

  • あなたは(あなたが直接起動する労働者に渡すことができますプールベースの関数の引数としてmultiprocessing.Queue()オブジェクトを渡すことはできませんが、それ以前の "それ以上の"ものではない)。
  • Noneをリスナープロセスに送信する前に、すべての非同期ワーカーが完了するまで待つ必要があります。

    queue = multiprocessing.Queue(-1) 
    

    :管理者が管理するQueue()インスタンスとして

    queue = multiprocessing.Manager().Queue(-1) 
    

    通過することができ

は交換してください、最初のものを修正するには。

pool.close() 
pool.join() 
queue.put_nowait(None) 

以上の複合体を::

getters = [] 
for i, sleep_time in enumerate(job_list): 
    name = str(i) 
    getters.append(
     pool.apply_async(worker_function, 
        args=(sleep_time, name, queue, worker_configurer)) 
    ) 
while len(getters): 
    getters.pop().get() 
# optionally, close and join pool here (generally a good idea anyway) 
queue.put_nowait(None) 

(あなたのいずれか、二を修正するなど、各非同期呼び出しからそれぞれの結果を収集し、またはプールを閉鎖し、それを待つために

put_nowaitputの待機中のバージョンに置き換えて、無制限の長さのキューを使用しないことも検討してください。)

+0

ありがとう!私は残っている問題を抱えています。つまり、作業者が作業を取り上げるたびに、各ロギングイベントごとに送信するインテンシカルなメッセージの数が1ずつ増えます。 'worker_configurer'関数でハンドラを追加する前に' if not len(root.handlers): '条件を追加しようとしましたが、何もしませんでした。私は現在、この問題[マルチプロセッシングの狂気のログ](https://stackoverflow.com/questions/20332359/logging-with-multiprocessing-madness)に相談しています。 (私は 'Process'設定からコードを適用しているので、私の関数のいくつかが誤った名前になっていることに気がつきました。) – GrayOnGray

+0

キーワード引数' maxtasksperchild = 1'を使ってプールを初期化して残った問題を解決しました。各タスクが新しいワーカーを生成します。この問題の類似性にもかかわらず、私は[マルチプロセッシングの狂気とのロギング](https://stackoverflow.com/questions/20332359/logging-with-multiprocessing-madness)とその答えの中で使用価値のあるものは何も見つかりませんでした。 – GrayOnGray

+1

基本的には少し難解です。特に移植性が必要な場合は、各プロセスは元の親プロセスとは独立しているため、Windows以外のシステムでは、各プロセスは元の親プロセス(ロガー構成も含む)から開始します。ロギングモジュールは、ルートロギングインスタンスのための一種のシングルトンとして動作するように設計されています。一方、ハンドラはスタック可能であり、したがってシングルトンではありません。だから、あなたは一人一人のために夢中になる可能性のあるセットで終わります。ワンポイントからのロサンゼルスの回答 - ロサンゼルスの頭をまっすぐに保つのがずっと簡単です。 – torek

1

2つのキューを使用することを検討してください。最初のキューは、作業者のデータを格納する場所です。ジョブ完了後の各ワーカーは、結果を2番目のキューにプッシュします。この2番目のキューを消費して、ログをファイルに書き込みます。

+0

これは理解しやすく管理しやすいようです。ロギングとマルチプロセッシングのやりとりを理解することは確実ではありませんが、最終的にはかなり長時間のマルチパートアルゴリズムを並列化しようとしているので別の用途のために作成されており、そのアルゴリズムのさまざまな部分に固有のログを保存したいと考えています。 – GrayOnGray

関連する問題