私はPythonでマルチプロセッシングを学びたいと考えています。 私は、各プロセスにtxt入力ファイルから1000行分のデータを送る簡単なコードを書いています。私のmain関数は、行を読み込んでそれを分割し、その文字列の要素を使って非常に簡単な操作を実行します。最終的に結果は出力ファイルに書き込まれます。Pythonのマルチプロセスは1つのプロセスしか使用しません
私が実行すると、4つのプロセスが正しく生成されますが、実際には最小限のCPUで実行されているプロセスは1つだけです。その結果、コードは非常に遅く、最初にマルチプロセッシングを使用する目的に反します。 私はこの質問(python multiprocessing apply_async only uses one process)のようなグローバルリストの問題はないと思うし、私の機能はあまりにも些細なものではないと思う(Python multiprocessing.Pool() doesn't use 100% of each CPU)。
私が間違っていることを理解することはできません。何か助けや助言をいただければ幸いです。私が提案(冗長データに前処理を削除)、次の自分のコードを修正
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
def grouper(n, iterable, padvalue=None):
return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue)
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
my_data = my_data.read().split("\n")
p = multiprocessing.Pool(4)
for chunk in grouper(1000, my_data):
results = p.map(myfunction, chunk)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)
編集:ここでは基本的なコードです。しかし、問題はまだそこにあるようです。コードのスニペットによると
import multiprocessing
import itertools
def myfunction(line):
returnlist=[]
list_of_elem=line.split(",")
elem_id=list_of_elem[1]
elem_to_check=list_of_elem[5]
ids=list_of_elem[2].split("|")
for x in itertools.permutations(ids,2):
if x[1] == elem_to_check:
returnlist.append(",".join([elem_id,x,"1\n"]))
else:
returnlist.append(",".join([elem_id,x,"0\n"]))
return returnlist
if __name__ == '__main__':
my_data = open(r"my_input_file_to_be_processed.txt","r")
p = multiprocessing.Pool(4)
results = p.map(myfunction, chunk, chunksize=1000)
for r in results:
with open (r"my_output_file","ab") as outfile:
outfile.write(r)
すべてのあなたの外側のループは、労働者のチャンクラインを配布します 'p.map'として私には無意味なようです。 'Pool.map'がすでに' chunksize'パラメータを持っているとき、なぜデータを手でスライスするのですか? – robyschek
私はあなたのデータを正しく準備していないと思います。以前に適切な数のチャンクでデータセットが分割されている場合は、 'p.map(func、dataset)'のようなもので 'Pool.map'を一度呼び出すか、' p.map( ' func、dataset、chunksize) 'を返します。 (あなたのループに 'Pool.map'を置くと、同時にチャンクの代わりにそれぞれのチャンクが順番に計算されます)。 – mgc
チャンクに関する提案をお寄せいただきありがとうございます。 @robyscheck:私はまだ基本機能が実行されている単線でチャンクを分割する必要があると思いますか? – user2447387