- プロセス入力の長いリストが
- をファイルは
- セットAエラーログを含めます同時使用CPUコア数(プロセス数)
python logging cookbookには、2つの優れたマルチプロセッシングの例があります。以下のコードでは、マルチプロセッシングを使用する2番目のメソッド(「別のスレッドでメインプロセスにログインする」)を変更しました.Queue。私自身も新しいユーザーでも、詳細な注釈を追加し、サンプルの入出力ファイルを作成しました。
import json
import logging
import multiprocessing
import numpy as np
import os
import pandas as pd
import threading
import time
def create_10_infiles():
"""Creates 10 csv files with 4x4 array of floats, + occasional strings"""
list_csv_in = []
for i in range(1,11):
csv_in = "{:02d}_in.csv".format(i)
# create a 4 row, 4 column dataframe with random values centered around i
df = pd.DataFrame(np.random.rand(16).reshape(4,4) * i)
# add a string to one of the arrays (as a reason to need error logging)
if i == 2 or i == 8:
df.loc[2,2] = "Oops, array contains a string. Welcome to data science."
# save to csv, and append filename to list of inputfiles
return list_csv_in
def logger_thread(queue):
"""Listener process that logs output received from other processes?"""
while True:
record = queue.get()
if record is None:
logger = logging.getLogger(record.name)
def worker_process(queue, infile):
"""Worker process that used to run tasks.
Each process is isolated, so it starts by setting up logging."""
# set up a handle to hold the logger output?
queue_handle = logging.handlers.QueueHandler(queue)
# creates a new logger called "process logger" (printed in each line)
logger = logging.getLogger("process logger")
# sets the logging level to DEBUG, so logger.info messages are printed.
# connects logger to handle defined above?
# here you can run your desired program, in the hope that the time saved from parallel
# processing is greater than the overhead of setting up all those processes and loggers:)
normalise_array_to_mean_and_save(infile, logger)
def normalise_array_to_mean_and_save(csv_in, logger):
"""Opens csv with array, checks dtypes, calculates mean, saves output csv."""
# check if file exists
if os.path.isfile(csv_in):
# open as pandas dataframe
df = pd.read_csv(csv_in)
# if none of the columns contain mixed datatypes (i.e, a string)
if not pd.np.dtype('object') in df.dtypes.tolist():
# calc mean over whole dataframe
mean = df.stack().mean()
logger.info("{}, Mean = {:0.2f}".format(csv_in, mean))
# normalise all values to mean. Save as "01_out.csv", "02_out.csv" etc
df = df/mean
csv_out = csv_in[:-6] + "out.csv"
logger.info("{}, Mean not calculated. Non-float values found.".format(csv_in))
if __name__ == '__main__':
# import your favourite json logging settings (collapsed for brevity)
logsettings = json.dumps({"version": 1, "root": {"handlers": ["console", "file"], "level": "DEBUG"}, "formatters": {"detailed": {"class": "logging.Formatter", "format": "%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s"}}, "handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}, "file": {"mode": "w", "formatter": "detailed", "class": "logging.FileHandler", "filename": "my_multiprocessing_logfile.log"}}})
config = json.loads(logsettings)
# replace default logfile with a filename containing the exact time
config['handlers']['file']['filename'] = time.strftime("%Y%m%d_%H_%M_%S") + "_mp_logfile.txt"
# load the logging settings
queue = multiprocessing.Queue()
workers = []
# set the number of concurrent processes created (i.e. CPU cores used)
num_processes = 4
# create 10 csv files with data, and return the list of filepaths
list_10_infiles = create_10_infiles()
# set up a process for each CPU core (e.g. 4)
for i in range(num_processes):
wp = multiprocessing.Process(target=worker_process,
args=(queue, list_10_infiles[i]))
# set up a thread as the logger_process
logger_process = threading.Thread(target=logger_thread, args=(queue,))
#At this point, the main process could do some useful work of its own
#Once it's done that, it can wait for the workers to terminate...
for wp in workers:
# set logger for main process if desired
root = logging.getLogger("main")
logger = logging.getLogger("main logger")
logger.info("CPUs used = {}/{}".format(num_processes, multiprocessing.cpu_count()))
logger.info('Program is finished. All files analysed.')
# And now tell the logging thread to finish up, too