2011-06-30 2 views
4

私は500の入力ファイルを持つフォルダを持っています(すべてのファイルの合計サイズは〜500 [MB]です)。Pythonマルチ処理の質問ですか?

私は、次の処理を行いpythonスクリプト記述したいと思います:(2)は、その空のpythonリストを初期化し、メモリ

(1)ロード入力ファイルのすべてを以降(3)15の異なる(独立した)processeを開始(4)

弾丸を見る使用されますS:[(1)からこれらのそれぞれ同じ入力データを使用する - はまだこのように異なる結果を

を生成し、プロセスにそれを異なるアルゴリズムを使用して(4) Iは、ステップ(3)からすべての独立したプロセスを[たいステップ(2)]で初期化されたのと同じpythonリスト[同じリストに自分の出力を格納するために

一旦、全ての15個のプロセスがその実行を完了した、私はすべての15個の独立したプロセスの結果が含まれてone python listを持つことになります。

私の質問は、pythonで上記を効率的に行うことができますか?もしそうなら、そうする方法を示すスキーム/サンプルコードを提供できますか?

注#1:これは強力なマルチコアサーバーで実行します。ここでの目標は、すべての独立したプロセスの中でいくつかのメモリ{input dataoutput list}を共有しながら、すべての処理能力を使用することです。

ノート#2:私はちょうど、複数の出版社に単一の加入者を証明するためにzeromqを使用して、これを手早くOK Linux環境

+0

これがすべてでうまく仕事に行くのではありません取得します。 1つのリストに対するすべての競合で、おそらく並行性からの利益の多くを見ないでしょう。 –

+0

@Rafe Kettler:何をお勧めしますか?ここでの私の目標は、処理の最後に結果を「集約」できることです。代替案は何ですか? (物事を明確にするために、出力リストのサイズは〜100 [MB]になります)。 – user3262424

+0

私はzeromqで動作しているのを見ることができます。複数の出版社、1人の加入者。加入者がリストを管理する。 私が持っている質問は#1です:テーブルの3番目のライブラリですか? #2:出力はどのように表現されますか? –

答えて

5

で働いています。あなたはおそらくキューと同じことをすることができますが、それらをもう少し管理する必要があります。 zeromqソケットはこのIMOのようなものにはうってつけです。

""" 
demo of multiple processes doing processing and publishing the results 
to a common subscriber 
""" 
from multiprocessing import Process 


class Worker(Process): 
    def __init__(self, filename, bind): 
     self._filename = filename 
     self._bind = bind 
     super(Worker, self).__init__() 

    def run(self): 
     import zmq 
     import time 
     ctx = zmq.Context() 
     result_publisher = ctx.socket(zmq.PUB) 
     result_publisher.bind(self._bind) 
     time.sleep(1) 
     with open(self._filename) as my_input: 
      for l in my_input.readlines(): 
       result_publisher.send(l) 

if __name__ == '__main__': 
    import sys 
    import os 
    import zmq 

    #assume every argument but the first is a file to be processed 
    files = sys.argv[1:] 

    # create a worker for each file to be processed if it exists pass 
    # in a bind argument instructing the socket to communicate via ipc 
    workers = [Worker(f, "ipc://%s_%s" % (f, i)) for i, f \ 
       in enumerate((x for x in files if os.path.exists(x)))] 

    # create subscriber socket 
    ctx = zmq.Context() 

    result_subscriber = ctx.socket(zmq.SUB) 
    result_subscriber.setsockopt(zmq.SUBSCRIBE, "") 

    # wire up subscriber to whatever the worker is bound to 
    for w in workers: 
     print w._bind 
     result_subscriber.connect(w._bind) 

    # start workers 
    for w in workers: 
     print "starting workers..." 
     w.start() 

    result = [] 

    # read from the subscriber and add it to the result list as long 
    # as at least one worker is alive 
    while [w for w in workers if w.is_alive()]: 
     result.append(result_subscriber.recv()) 
    else: 
     # output the result 
     print result 

ああとzmqがちょうど

$ pip install pyzmq-static 
+0

@Tom Willis:ありがとう!上記のコードはサーバーのすべての処理能力を使用しますか?また、その出力を格納する 'list'がディスクにアクセスすることなくメモリ内で操作されることを確認できますか?また、linuxコマンド 'top'を実行すると、' python'プロセスが1つ、または15プロセスが表示されますか? – user3262424

+0

あなたが実行している各作業者+メインプロセスの上に、そのリストがメモリにある上のコードでプロセスが上に表示されます。上記のスクリプトがディスクにアクセスする唯一の時間は、引数として渡されるファイルを読み取ることです。私は、単にプロセス間通信がどのように行われ、0mqでどの程度簡単にできるのかを実証したかっただけです。 –

+0

最終的なことは明らかではないかもしれません。プロセス間のメッセージは文字列でなければなりません。あなたのデータにもっと構造が必要な場合は、json –