2016-11-22 12 views
1

私はこのコードを持っています(私は実際には私の作業コードからコピー貼り付けをしていることを謝ります。その全体がここにあります):Pythonマルチプロセッシングコードはうまく動作しますが終了しません

def init(Q): 
    """Serves to initialize the queue across all child processes""" 
    global q 
    q = Q 

def queue_manager(q): 
    """Listens on the queue, and writes pushed data to file""" 
    while True: 
     data = q.get() 
     if data is None: 
      break 
     key, preds = data 
     with pd.HDFStore(hdf_out, mode='a', complevel=5, complib='blosc') as out_store: 
      out_store.append(key, preds) 

def writer(message): 
    """Pushes messages to queue""" 
    q.put(message) 

def reader(key): 
    """Reads data from store, selects required days, processes it""" 
    try: 
     # Read the data 
     with pd.HDFStore(hdf_in, mode='r') as in_store: 
      df = in_store[key] 
    except KeyError as ke: 
     # Almost guaranteed to not happen 
     return (key, pd.DataFrame()) 
    else: 
     # Executes only if exception is not raised 
     fit_df = df[(df.index >= '2016-09-11') & \ 
        (df.index < '2016-09-25') & \ 
        (df.index.dayofweek < 5)].copy() 
     pre_df = df[(df.index >= '2016-09-18') & \ 
        (df.index < '2016-10-2') & \ 
        (df.index.dayofweek < 5)].copy() 
     del df 
     # model_wrapper below is a custom function in another module. 
     # It works fine. 
     models, preds = model_wrapper(fit_df=fit_df, pre_df=pre_df) 
     if preds is not None: 
      writer((key, preds)) 
      del preds 
    return (key, models) 

def main(): 
    sensors = pd.read_csv('sens_metadata.csv', index_col=[0]) 
    nprocs = int(cpu_count() - 0) 
    maxproc = 10 
    q = Queue() 
    t = Thread(target=queue_manager, args=(q,)) 

    print("Starting process at\t{}".format(dt.now().time())) 
    sys.stdout.flush() 
    t.start() 
    with Pool(processes=nprocs, maxtasksperchild=maxproc, initializer=init, 
       initargs=(q,)) as p: 
     models = p.map(reader, sensors.index.tolist(), 1) 
    print("Processing done at\t{}".format(dt.now().time())) 
    print("\nJoining Thread, and finishing writing predictions") 
    sys.stdout.flush() 
    q.put(None) 
    t.join() 
    print("Thread joined successfully at\t{}".format(dt.now().time())) 
    print("\nConcatenating models and serializing to pickle") 
    sys.stdout.flush() 
    pd.concat(dict(models)).to_pickle(path + 'models.pickle') 
    print("Pickled successfully at\t{}".format(dt.now().time())) 

if __name__ == '__main__': 
    main() 

このコードはひどく偏ったコイン投げのように動作します。ほとんどの場合、動作しない場合もありますが、時には動作します。実行すると、データ全体(すべてkeys)の実行が完了するまでに約2.5時間かかります。すべてのデータを処理しますが、hdf_outファイルのデータが表示されますが、マルチプロセッシングプールは参加しません。すべての子プロセスはアクティブですが、何もしません。私はちょうどプログラムがそれのように掛かっているかもしれない理由を理解していない。

これが発生した場合、私には"Processing done at ...""Joining Thread, ..."というメッセージは表示されません。また、小さなデータセットを与えると、それは終了します。 predsの計算を除外すると終了します。私はmodelsの計算を大幅に修正することなく除外することはできません。これはプロジェクトの残りの部分には役に立たないでしょう。

なぜこのようなことが起こっているのかわかりません。私はLinux(Kubuntu 16.04)を使用しています。

答えて

0

明らかに、kwagを解消すると、問題が解決します。どうして私が明確に理解できないものがあるのですか?私はそれがフォークプロセス(Linuxのデフォルト)とスポーンプロセス(Windowsの唯一のオプション)の区別と関係していると思います。

フォークプロセスmaxtaskperchildは、パフォーマンスがなくても良いので、明らかに必要ありません。 maxtaskperchildを落としてメモリ使用量を改善したことに気付きました。メモリは子プロセスによってホーグされませんが、親プロセスから共有されます。しかし、Windowsを使用しなければならない時には、特に、長いタスクリストでメモリ集約型のタスクを実行しているときに、子プロセスが肥大化するのを防ぐための重要な方法が、でした。

何が起こっているのかよく分かっている方は、この回答を編集してください。

関連する問題