2017-08-25 12 views
1

私はマルチスレッドでPythonスクリプトを作成しました。各スレッドは、スレッドが以前存在しなかった新しい一意の値で辞書を更新するため、スレッドセーフであるグローバルdictに値を書き込みます。出力ファイルのdictの結果ですが、 "繰り返しの間に辞書のサイズが変更されました"が表示され、ファイルへのダンプ中に書き込むために辞書をロックするような方法があります。仕事python multithreading save dictionary result

def do_function(): 
    while True: 
     r=q.get() 
     global_dict[r]={} --> this is thread safe as r is unique it will not repeat again 
     telephone,address=get_info(r) 
     global_dict[r]['t']=telephone 
     global_dict[r]['a']=address 

     with open("output.pickle","wb") as j: --> save to file 
       pickle.dump(global_dict,j) --> receive error dictionary changed size during iteration 

     q.task_done() 

global dict={} 
thread=10 
q = Queue(threads * 2) 
for i in range(concurrent): 
    t = Thread(target=do_function) 
    t.daemon = True 
    t.start() 
for p in lst: 
     q.put(p) 
    q.join() 
+0

あなたが指定した2行のコードは無効なPythonであり、[mcve]も作成されません。何がうまくいかなかったか教えてください。 –

+0

重複していますか? https://stackoverflow.com/questions/1312331/using-a-global-dictionary-with-threads-in-python – Alexander

+0

重複していない、私はこれを見て、辞書のどの操作がスレッドセーフであり、どの操作が – Amr

答えて

0

スレッディングにファイルを書き込む必要はありません。そしておそらくそれは誤りです。 グローバルディクテーションなのですべてのスレッドが完了したら、実行することができます。

with open("output.pickle","wb") as j: 
    pickle.dump(global_dict,j) 

ファイルの最後に移動してください。

スレッドがファイルに辞書をダンプし、最初のスレッドがその辞書が

EDITED 1反復

時にサイズを変更し文句を言うので、別のスレッドが、辞書を変更したときにあなたのエラーがによって引き起こされます

私は簡単な解決策はグローバル変数を使用しないと考えると、エラーは発生しません。このような :交換しないを追加するためのファイルを開くには、「AB」モードを使用

import threading 
lock = threading.Lock() 

def do_function(): 
    while True: 
     r=q.get() 
     d={} 
     telephone,address=get_info(r) 
     d['t']=telephone 
     d['a']=address 
     lock.acquire() 
     with open("output.pickle","ab") as j: 
       pickle.dump(d,j) 
     lock.release() 
     q.task_done() 

予告は、「WB」を使用しないでください。

EDITEDロックたびの書き込みを使用して2

ファイルに重いコストを有することができます。回避策は、スレッドごとに異なるファイルへの書き込みを行います。このファイルには、このスレッドに入るときに生成されたuuidによって名前を付けることができます。

さらに速い方法はです。を書き込むときは一括書き込みとロックを使用できます。それは非常に古いmehodよりも速くなります。

サンプルコード:

import threading 
lock = threading.Lock() 

def do_function(): 
    buffer = [] 
    while True: 
     r=q.get() 
     d={} 
     telephone,address=get_info(r) 
     d['t']=telephone 
     d['a']=address 
     buffer.append(d) 
     q.task_done() 

     if len(buffer) >= BATCH_COUNT: 
      lock.acquire() 
      with open("output.pickle","ab") as j: 
        pickle.dump(buffer,j) 
      lock.release() 
      buffer = [] 

BATCH_COUNTが1000または10000またはあなたが好きなものである可能性があります。

+0

はい私は理解しています。すべてのスレッドが終了した直後ではなく、継続的にファイルに書きたいと思います。プログラムが破損した場合、最初から繰り返す必要はなく、残っている場所から継続します。これを行う方法がありますか? – Amr

+0

答えが更新されました – GuangshengZuo

+0

すべてのスレッドが同時にファイルに書き込むと、これはファイルを読むことができなくなり、コンテンツが重複するか、ファイルに書き込むことがスレッドセーフになると思いますか? – Amr