と入力ファイルのリスト:Pythonのマルチプロセッシング:プロセス私は次のことを行うためにPythonのマルチプロセッシングを使用したいエラーログ
- プロセス入力の長いリストが
- をファイルは
- セットAエラーログを含めます同時使用CPUコア数(プロセス数)
python logging cookbookには、2つの優れたマルチプロセッシングの例があります。以下のコードでは、マルチプロセッシングを使用する2番目のメソッド(「別のスレッドでメインプロセスにログインする」)を変更しました.Queue。私自身も新しいユーザーでも、詳細な注釈を追加し、サンプルの入出力ファイルを作成しました。
私が立ち往生しているところは、コードが私のリストの項目の数ではなく、CPUコアの数を反復することです。
同時プロセス数の上限を超えないで、すべての入力ファイルにこの機能を適用するにはどうすればよいですか?
import json
import logging
import multiprocessing
import numpy as np
import os
import pandas as pd
import threading
import time
def create_10_infiles():
"""Creates 10 csv files with 4x4 array of floats, + occasional strings"""
list_csv_in = []
for i in range(1,11):
csv_in = "{:02d}_in.csv".format(i)
# create a 4 row, 4 column dataframe with random values centered around i
df = pd.DataFrame(np.random.rand(16).reshape(4,4) * i)
# add a string to one of the arrays (as a reason to need error logging)
if i == 2 or i == 8:
df.loc[2,2] = "Oops, array contains a string. Welcome to data science."
# save to csv, and append filename to list of inputfiles
df.to_csv(csv_in)
list_csv_in.append(csv_in)
return list_csv_in
def logger_thread(queue):
"""Listener process that logs output received from other processes?"""
while True:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(queue, infile):
"""Worker process that used to run tasks.
Each process is isolated, so it starts by setting up logging."""
# set up a handle to hold the logger output?
queue_handle = logging.handlers.QueueHandler(queue)
# creates a new logger called "process logger" (printed in each line)
logger = logging.getLogger("process logger")
# sets the logging level to DEBUG, so logger.info messages are printed.
logger.setLevel(logging.DEBUG)
# connects logger to handle defined above?
logger.addHandler(queue_handle)
# here you can run your desired program, in the hope that the time saved from parallel
# processing is greater than the overhead of setting up all those processes and loggers:)
normalise_array_to_mean_and_save(infile, logger)
def normalise_array_to_mean_and_save(csv_in, logger):
"""Opens csv with array, checks dtypes, calculates mean, saves output csv."""
# check if file exists
if os.path.isfile(csv_in):
# open as pandas dataframe
df = pd.read_csv(csv_in)
# if none of the columns contain mixed datatypes (i.e, a string)
if not pd.np.dtype('object') in df.dtypes.tolist():
# calc mean over whole dataframe
mean = df.stack().mean()
logger.info("{}, Mean = {:0.2f}".format(csv_in, mean))
# normalise all values to mean. Save as "01_out.csv", "02_out.csv" etc
df = df/mean
csv_out = csv_in[:-6] + "out.csv"
df.to_csv(csv_out)
else:
logger.info("{}, Mean not calculated. Non-float values found.".format(csv_in))
if __name__ == '__main__':
os.chdir(r"D:\data")
# import your favourite json logging settings (collapsed for brevity)
logsettings = json.dumps({"version": 1, "root": {"handlers": ["console", "file"], "level": "DEBUG"}, "formatters": {"detailed": {"class": "logging.Formatter", "format": "%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s"}}, "handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}, "file": {"mode": "w", "formatter": "detailed", "class": "logging.FileHandler", "filename": "my_multiprocessing_logfile.log"}}})
config = json.loads(logsettings)
# replace default logfile with a filename containing the exact time
config['handlers']['file']['filename'] = time.strftime("%Y%m%d_%H_%M_%S") + "_mp_logfile.txt"
# load the logging settings
logging.config.dictConfig(config)
queue = multiprocessing.Queue()
workers = []
# set the number of concurrent processes created (i.e. CPU cores used)
num_processes = 4
# create 10 csv files with data, and return the list of filepaths
list_10_infiles = create_10_infiles()
# set up a process for each CPU core (e.g. 4)
for i in range(num_processes):
wp = multiprocessing.Process(target=worker_process,
name='worker_{}'.format(i+1),
args=(queue, list_10_infiles[i]))
workers.append(wp)
wp.start()
# set up a thread as the logger_process
logger_process = threading.Thread(target=logger_thread, args=(queue,))
logger_process.start()
#At this point, the main process could do some useful work of its own
#Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# set logger for main process if desired
root = logging.getLogger("main")
root.setLevel(logging.DEBUG)
logger = logging.getLogger("main logger")
logger.info("CPUs used = {}/{}".format(num_processes, multiprocessing.cpu_count()))
logger.info('Program is finished. All files analysed.')
# And now tell the logging thread to finish up, too
queue.put(None)
logger_process.join()
注:CPUコアの数に応じて、入力ファイルのリストをチャンクに分割しようとしました。これでファイルは処理されましたが、非常に遅いです。