2016-08-17 19 views
0

と入力ファイルのリスト:Pythonのマルチプロセッシング:プロセス私は次のことを行うためにPythonのマルチプロセッシングを使用したいエラーログ

  • プロセス入力の長いリストが
  • をファイルは
  • セットAエラーログを含めます同時使用CPUコア数(プロセス数)

python logging cookbookには、2つの優れたマルチプロセッシングの例があります。以下のコードでは、マルチプロセッシングを使用する2番目のメソッド(「別のスレッドでメインプロセスにログインする」)を変更しました.Queue。私自身も新しいユーザーでも、詳細な注釈を追加し、サンプルの入出力ファイルを作成しました。

私が立ち往生しているところは、コードが私のリストの項目の数ではなく、CPUコアの数を反復することです。

同時プロセス数の上限を超えないで、すべての入力ファイルにこの機能を適用するにはどうすればよいですか?

import json 
import logging 
import multiprocessing 
import numpy as np 
import os 
import pandas as pd 
import threading 
import time 

def create_10_infiles(): 
    """Creates 10 csv files with 4x4 array of floats, + occasional strings""" 
    list_csv_in = [] 
    for i in range(1,11): 
     csv_in = "{:02d}_in.csv".format(i) 
     # create a 4 row, 4 column dataframe with random values centered around i 
     df = pd.DataFrame(np.random.rand(16).reshape(4,4) * i) 
     # add a string to one of the arrays (as a reason to need error logging) 
     if i == 2 or i == 8: 
      df.loc[2,2] = "Oops, array contains a string. Welcome to data science." 
     # save to csv, and append filename to list of inputfiles 
     df.to_csv(csv_in) 
     list_csv_in.append(csv_in) 
    return list_csv_in 

def logger_thread(queue): 
    """Listener process that logs output received from other processes?""" 
    while True: 
     record = queue.get() 
     if record is None: 
      break 
     logger = logging.getLogger(record.name) 
     logger.handle(record) 

def worker_process(queue, infile): 
    """Worker process that used to run tasks. 
    Each process is isolated, so it starts by setting up logging.""" 
    # set up a handle to hold the logger output? 
    queue_handle = logging.handlers.QueueHandler(queue) 
    # creates a new logger called "process logger" (printed in each line) 
    logger = logging.getLogger("process logger") 
    # sets the logging level to DEBUG, so logger.info messages are printed. 
    logger.setLevel(logging.DEBUG) 
    # connects logger to handle defined above? 
    logger.addHandler(queue_handle) 
    # here you can run your desired program, in the hope that the time saved from parallel 
    # processing is greater than the overhead of setting up all those processes and loggers:) 
    normalise_array_to_mean_and_save(infile, logger) 

def normalise_array_to_mean_and_save(csv_in, logger): 
    """Opens csv with array, checks dtypes, calculates mean, saves output csv.""" 
    # check if file exists 
    if os.path.isfile(csv_in): 
     # open as pandas dataframe 
     df = pd.read_csv(csv_in) 
     # if none of the columns contain mixed datatypes (i.e, a string) 
     if not pd.np.dtype('object') in df.dtypes.tolist(): 
      # calc mean over whole dataframe 
      mean = df.stack().mean() 
      logger.info("{}, Mean = {:0.2f}".format(csv_in, mean)) 
      # normalise all values to mean. Save as "01_out.csv", "02_out.csv" etc 
      df = df/mean 
      csv_out = csv_in[:-6] + "out.csv" 
      df.to_csv(csv_out) 
     else: 
      logger.info("{}, Mean not calculated. Non-float values found.".format(csv_in)) 

if __name__ == '__main__': 
    os.chdir(r"D:\data") 
    # import your favourite json logging settings (collapsed for brevity) 
    logsettings = json.dumps({"version": 1, "root": {"handlers": ["console", "file"], "level": "DEBUG"}, "formatters": {"detailed": {"class": "logging.Formatter", "format": "%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s"}}, "handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}, "file": {"mode": "w", "formatter": "detailed", "class": "logging.FileHandler", "filename": "my_multiprocessing_logfile.log"}}}) 
    config = json.loads(logsettings) 
    # replace default logfile with a filename containing the exact time 
    config['handlers']['file']['filename'] = time.strftime("%Y%m%d_%H_%M_%S") + "_mp_logfile.txt" 
    # load the logging settings 
    logging.config.dictConfig(config) 

    queue = multiprocessing.Queue() 
    workers = [] 
    # set the number of concurrent processes created (i.e. CPU cores used) 
    num_processes = 4 

    # create 10 csv files with data, and return the list of filepaths 
    list_10_infiles = create_10_infiles() 

    # set up a process for each CPU core (e.g. 4) 
    for i in range(num_processes): 
     wp = multiprocessing.Process(target=worker_process, 
            name='worker_{}'.format(i+1), 
            args=(queue, list_10_infiles[i])) 
     workers.append(wp) 
     wp.start() 

    # set up a thread as the logger_process 
    logger_process = threading.Thread(target=logger_thread, args=(queue,)) 
    logger_process.start() 

    #At this point, the main process could do some useful work of its own 
    #Once it's done that, it can wait for the workers to terminate... 
    for wp in workers: 
     wp.join() 

    # set logger for main process if desired 
    root = logging.getLogger("main") 
    root.setLevel(logging.DEBUG) 
    logger = logging.getLogger("main logger") 
    logger.info("CPUs used = {}/{}".format(num_processes, multiprocessing.cpu_count())) 
    logger.info('Program is finished. All files analysed.') 

    # And now tell the logging thread to finish up, too 
    queue.put(None) 
    logger_process.join() 

注:CPUコアの数に応じて、入力ファイルのリストをチャンクに分割しようとしました。これでファイルは処理されましたが、非常に遅いです。

答えて

0

私は、キューの代わりにPythonマルチプロセッシングプールを使用すると、ファイルの長いリストを処理し、同時コアの数を制限することができたことがわかりました。

ロギングはプールと互換性がありませんが、戻り値を収集することが可能であることがわかりました。戻り値は、コードが例外をスローしないと仮定して、すべてのファイルが処理された後にログに記録できます。

誰かが私にもっと優雅な解決策を与えてくれるかもしれませんが、今のところ問題が解決しています。

from multiprocessing import Pool 
from time import strftime 
import logging 

def function_to_process_files(file): 
    #..check file integrity, etc.. 
    if file_gives_an_error: 
     return "{} file {} gave an error".format(strftime("%Y%m%d_%H_%M_%S"), file) 
    #..do stuff without using the logging module.. 
    #.. for slow, irregular processes, printing to console is possible.. 
    return "{} file {} processed correctly".format(strftime("%Y%m%d_%H_%M_%S"), file) 

if __name__ == "__main__": 

    list_of_files_to_process = define_your_file_list_somehow() 

    logging = logging.setup_regular_logging_to_file_as_desired() 

    # define the number of CPU cores to be used concurrently 
    n_processes = 4 

    with Pool(processes=n_processes) as pool: 
     list_of_return_statements = pool.map(function_to_process_files, list_of_files_to_process) 
    # now transfer the list of return statements to the logfile 
    for return_statement in list_of_return_statements: 
     logging.info(return_statement) 
関連する問題