2017-12-16 7 views
0

enter image description herePythonのマルチプロセッシングは、すべてのプロセスが一度

私はチャンクでCSVを読み、4つのプロセスのプールにチャンクを渡していますで動作していません。

pool = Pool(processes=4) 
      chunk_index = 1 
      for df in pd.read_csv(downloaded_file, chunksize=chunksize, compression='gzip', skipinitialspace=True, encoding='utf-8'): 
       output_file_name = output_path + merchant['output_file_format'].format(
        file_index, chunk_index) 
       pool.map(wrapper_process, [ 
         (df, transformer, output_file_name)]) 
       chunk_index += 1 

このコードでは、私は4つのプロセスを連続して実行する必要があります。しかし、以下のhtopのスクリーンショットでは、常に2回実行されています。 1つはhtopそれ自身を命令することです。これは、その時点で1つのpythonプロセスしか実行されていないことを意味します。メモリ使用量から

enter image description here

は、それは4つのチャンクが1つのチャンクを備えるメモリ にロードされたときに、私は2ギガバイト私は一度にプロセッサのために使用することができますどのようにほとんど

でのみ可能になると思う12ギガバイトであります。

+0

私はちょうど私の例では愚かな間違いを修正しました。あなたはそれをテストすることができましたか? – hansaplast

答えて

0

問題は、地図の仕組みが間違っていることです。 the docから:

map(func, iterable[, chunksize]) この方法は、別個のタスクとして処理プールに を送信するチャンクの数に反復可能にチョップ。これらの チャンクの(おおよその)サイズは、chunkksizeを正の整数に設定することで指定できます。

繰り返し可能なので、リストは1つの要素、つまりタプル(df, ...)で提供されます。 しかし、あなたは多くの要素でiterableを提供する必要があります。この作業を行うには、最初のリストを準備してのみ、その後のプロセスに送信 に必要があると思います(ヒントは: あなただけPool()を書くことができますし、Pythonはコア 自体の数を見つける聞かせて)

pool = Pool() 
chunk_index = 1 
list = [] 
for df in pd.read_csv(downloaded_file, 
     chunksize=chunksize, 
     compression='gzip', 
     skipinitialspace=True, 
     encoding='utf-8'): 
    output_file_name = output_path + 
     merchant['output_file_format'].format(file_index, chunk_index) 
    list.append((df, transformer, output_file_name)]) 
    chunk_index += 1 
pool.map(wrapper_process, list) 

しかし、今、あなたは OKかもしれない メモリ内の完全なCSVデータを保持する必要があるという問題があるが、通常ではありません。この問題を回避来て あなたは、キューを使用するように切り替えることができます:あなたは

  • プロセスを開始し、(まだ開始時には空である)は、キューからアイテムを得るためにそれらを教えて空のキューを構築します
  • あなたの主なプロセスでキューを養う(そしておそらくメモリの消費量が屋根に入らないように、キューが長すぎるなっていないことを確認してください)プロセスが自分自身
  • やめるようキューに STOP要素を置きます

the official doc (look at the last example on the page)には良い例がありますが、それはあなたがそれに近づくことを説明しています。

最後の一言:あなたの操作はCPUバウンドであるあなたは確かにいますか?あなたはwrapper_processに 処理の多くを行う(そしておそらくもtransformer)ですか?あなたは自分のプログラムを処理あまりせずに別の のファイルにCSVを分割する場合はIOバウンドとされていないのでCPUは​​マルチプロセッシングはどんな意味がありませんその後、 を拘束して。

+0

これは小さなデータに対しても機能します。私の1チャンクは2GBで、私の記憶に24GBのデータを保持するのは現実的ではありません。どのように地図の仕組みが明確に示されているので、私が受け入れる方法は正しい解決策です。私は 'apply_aysnc'を使って管理し、プールの上限に達すると強制的にresposneを取得しました。ありがとう –

関連する問題