2016-04-26 14 views
1

私はPythonでマルチプロセッシングを学びたいと考えています。 私は、各プロセスにtxt入力ファイルから1000行分のデータを送る簡単なコードを書いています。私のmain関数は、行を読み込んでそれを分割し、その文字列の要素を使って非常に簡単な操作を実行します。最終的に結果は出力ファイルに書き込まれます。Pythonのマルチプロセスは1つのプロセスしか使用しません

私が実行すると、4つのプロセスが正しく生成されますが、実際には最小限のCPUで実行されているプロセスは1つだけです。その結果、コードは非常に遅く、最初にマルチプロセッシングを使用する目的に反します。 私はこの質問(python multiprocessing apply_async only uses one process)のようなグローバルリストの問題はないと思うし、私の機能はあまりにも些細なものではないと思う(Python multiprocessing.Pool() doesn't use 100% of each CPU)。

私が間違っていることを理解することはできません。何か助けや助言をいただければ幸いです。私が提案(冗長データに前処理を削除)、次の自分のコードを修正

import multiprocessing 
import itertools 

def myfunction(line): 
     returnlist=[] 
     list_of_elem=line.split(",") 
     elem_id=list_of_elem[1] 
     elem_to_check=list_of_elem[5] 

     ids=list_of_elem[2].split("|") 

     for x in itertools.permutations(ids,2): 
       if x[1] == elem_to_check: 
          returnlist.append(",".join([elem_id,x,"1\n"])) 
       else: 
          returnlist.append(",".join([elem_id,x,"0\n"])) 

     return returnlist  

def grouper(n, iterable, padvalue=None): 
    return itertools.izip_longest(*[iter(iterable)]*n, fillvalue=padvalue) 

if __name__ == '__main__': 
    my_data = open(r"my_input_file_to_be_processed.txt","r") 
    my_data = my_data.read().split("\n") 

    p = multiprocessing.Pool(4) 

    for chunk in grouper(1000, my_data): 
      results = p.map(myfunction, chunk) 
      for r in results: 
       with open (r"my_output_file","ab") as outfile: 
        outfile.write(r) 

編集:ここでは基本的なコードです。しかし、問題はまだそこにあるようです。コードのスニペットによると

import multiprocessing 
import itertools 

def myfunction(line): 
     returnlist=[] 
     list_of_elem=line.split(",") 
     elem_id=list_of_elem[1] 
     elem_to_check=list_of_elem[5] 

     ids=list_of_elem[2].split("|") 

     for x in itertools.permutations(ids,2): 
       if x[1] == elem_to_check: 
          returnlist.append(",".join([elem_id,x,"1\n"])) 
       else: 
          returnlist.append(",".join([elem_id,x,"0\n"])) 

     return returnlist  

if __name__ == '__main__': 
    my_data = open(r"my_input_file_to_be_processed.txt","r") 

    p = multiprocessing.Pool(4) 

    results = p.map(myfunction, chunk, chunksize=1000) 
     for r in results: 
      with open (r"my_output_file","ab") as outfile: 
       outfile.write(r) 
+1

すべてのあなたの外側のループは、労働者のチャンクラインを配布します 'p.map'として私には無意味なようです。 'Pool.map'がすでに' chunksize'パラメータを持っているとき、なぜデータを手でスライスするのですか? – robyschek

+0

私はあなたのデータを正しく準備していないと思います。以前に適切な数のチャンクでデータセットが分割されている場合は、 'p.map(func、dataset)'のようなもので 'Pool.map'を一度呼び出すか、' p.map( ' func、dataset、chunksize) 'を返します。 (あなたのループに 'Pool.map'を置くと、同時にチャンクの代わりにそれぞれのチャンクが順番に計算されます)。 – mgc

+0

チャンクに関する提案をお寄せいただきありがとうございます。 @robyscheck:私はまだ基​​本機能が実行されている単線でチャンクを分割する必要があると思いますか? – user2447387

答えて

0

私は4人の労働者(によって行われる8部にチャンクにファイルをこのような何かをして、計算になるだろうと思いなぜ8つのチャンクと4人の労働者?ただ、私は例のために作られたランダム選択):。

from multiprocessing import Pool 
import itertools 

def myfunction(lines): 
    returnlist = [] 
    for line in lines: 
     list_of_elem = line.split(",") 
     elem_id = list_of_elem[1] 
     elem_to_check = list_of_elem[5] 
     ids = list_of_elem[2].split("|") 

     for x in itertools.permutations(ids,2): 
      returnlist.append(",".join(
       [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"])) 

    return returnlist 

def chunk(it, size): 
    it = iter(it) 
    return iter(lambda: tuple(itertools.islice(it, size)),()) 

if __name__ == "__main__": 
    my_data = open(r"my_input_file_to_be_processed.txt","r") 
    my_data = my_data.read().split("\n") 

    prep = [strings for strings in chunk(my_data, round(len(my_data)/8))] 
    with Pool(4) as p: 
     res = p.map(myfunction, prep) 

    result = res.pop(0) 
    _ = list(map(lambda x: result.extend(x), res)) 
    print(result)  # ... or do something with the result 

編集: これは、すべての行が同じ方法でフォーマットされているとエラーが生じないと確信していると仮定しています。

あなたのコメントによると、それはあなたの機能に問題が何であるかを確認するために役に立つかもしれない/ multiprocessingせずにそれをテストしたり、かなり大/醜い方法以外で/試みを使用して、ファイルの内容は、ほとんど確かに出力が生成されます(どちらか例外または結果):

def myfunction(lines): 
    returnlist = [] 
    for line in lines: 
     try: 
      list_of_elem = line.split(",") 
      elem_id = list_of_elem[1] 
      elem_to_check = list_of_elem[5] 
      ids = list_of_elem[2].split("|") 

      for x in itertools.permutations(ids,2): 
       returnlist.append(",".join(
        [elem_id,x,"1\n" if x[1] == elem_to_check else "0\n"])) 
     except Exception as err: 
      returnlist.append('I encountered error {} on line {}'.format(err, line)) 

    return returnlist 
+0

ありがとうございます。しかし、これはエラーをスローします:with p。プール(4)をmp_poolとして: AttributeError:_ _exit_ _ – user2447387

+0

Youldは 'with p.Pool(4)..'と代わりに' Pool(4)as p: 'と書くべきでしょう。 – mgc

+0

いいえ、それは明らかに問題ではありません。同じエラーが発生します。 – user2447387

関連する問題