2017-11-05 23 views
1

だから私のコードでは、私はこのようなものがあります:マルチプロセッシングを使用してHDF5ファイルに書き込む方法は?

import tables 
import bson 

def proc(): 
    data = bson.decode_file_iter(open('file.bson'), 'rb')) 
    atom = tables.Float64Atom() 
    f = tables.open_file('t.hdf5', mode='w') 
    array_c = f.create_earray(f.root, 'data', atom, (0, m)) 

    for c,d in enumerate(data): 
     for e,p in enumerate(d['id']): 
      x = some_array1bym() 
      array_c.append(x) 

    f.close() 

これは正常に動作しますが、私はマルチプロセッシングでこれを右にしたいが、私はこれに新しいですから、私は私が見つけた、まさにこれを行う方法を知りません

def proc(): 
    NCORE = 6    
    data = bson.decode_file_iter(open('file.bson'), 'rb')) 
    atom = tables.Float64Atom() 
    f = tables.open_file('t.hdf5', mode='w') 
    array_c = f.create_earray(f.root, 'data', atom, (0, m)) 

    def process(q, iolock): 
     while True: 
      d = q.get() 
      if d is None: 
       break 
      for e, p in enumerate(d['id']): 
       x = some_array1bym() 
       array_c.append(x) 

    q = mp.Queue(maxsize=NCORE) 
    iolock = mp.Lock() 
    pool = mp.Pool(NCORE, initializer=process, initarg=(q,iolock)) 

    for c,d in enumerate(data): 
     q.put(d) 

    for _ in range(NCORE): 
     q.put(None) 
    pool.close() 
    pool.join() 

    f.close() 

これは私に空のファイルを与えます。

誰でも手伝いできますか?

ありがとう!

+1

: * HDF5はデータベースではありません私は何をする必要がありますが、次のようなものにあなたのprocess機能を変更することだと思います。追記型、読み込み型のデータセットに最適です。ファイルはいつでもファイルに追加できますが、複数のライターが同時にそのようにすると、ファイルが破損する可能性があります。* – hoefling

+0

あなたのサンプルを実行できないため、何がうまくいかないかを正確に伝えるのは難しいです。おそらく、 'data'と' some_array1bym'を適切なプレースホルダに置き換えて、他のユーザがあなたのコードをテストできるようにすることができます。 – myrtlecat

答えて

0

multiprocessing.Poolを少し誤解しました。 Poolを初期化すると、ワーカープロセスNが起動します。 initializer引数は、起動時に各ワーカープロセスで一度だけ実行される関数です。プロセスが後で実行するのはタスクではありません。 Pool.mapまたはPool.apply(またはそれらの非同期補完)のようなメソッドを使用して、実際にジョブをプールに送信して処理します。

+0

OPの例では、 'initializer'関数はすべての作業を行います(キューからタスクが' None'を見るまで)ので、これは問題ではないと思います。 – myrtlecat

+0

そうではありません。待ち行列は呼び出されたときに空であるため、すぐに戻ります。 – Iguananaut

+0

キューが空の場合、 'q.get()'の呼び出しは取得する項目があるまでブロックされます。 'initializer'メソッドは' if d is None:break'条件が満たされるまで戻りません。 – myrtlecat

0

私は問題がarray_c変数と関係している可能性があると思います。プールフォークの後、各作業者はコピーをこの変数にします。これは、a)array_cのこれらのコピーのいずれかがhdf5ファイルへの書き込みを試み、未定義の結果を与えるか、またはb)メインプロセスのコピーのみが空であり、f.close()が呼び出されたときにファイルに書き込みます。私はpytablesの内部について熟知していないのかどうかはわかりません。

array_cとは対照的に、qiolockは、すべてのワーカーとメインスレッドで共有されます。 qmp.Queueインスタンスであり、iolockmp.Lockインスタンスであり、これらのクラスは複数のプロセスで一度に使用できるように特別に設計されています。私は同じことがpytablesクラスの真実だとは思わない。

mp.Lockインスタンスを使用して、一度に1つのプロセスだけがファイルに書き込むようにする必要があります。著書「データ分析のためのPython」から

ウェス・マッキニーを引用
def process(q, iolock): 
    while True: 
     d = q.get() 
     if d is None: 
      break 
     for e, p in enumerate(d['id']): 
      x = some_array1bym() 
      # acquire lock to ensure only one process writes data at once 
      iolock.acquire() 
      # get new handle to hdf5 file in append mode 
      f = tables.open_file('t.hdf5', mode='a') 
      # get new handle to data set and append new data 
      array_c = tables.EArray(f.root, 'data') 
      array_c.append(x) 
      # close file and release lock to allow another process to write 
      f.close() 
      iolock.release() 
関連する問題