2016-04-25 27 views
1

Pythonで値の配列を読み取り、データ配列の一部を操作するために非同期で複数のプロセスを実行する小さなマルチプロセッシングプログラムを書きました。それぞれの別々のプロセスは、2次元配列のそれ自身の1次元セクションでなければならず、プロセス間で重複はありません。すべてのプロセスが完了すると、共有メモリアレイはファイルに書き出されますが、共有メモリアレイに期待値/計算値は存在しませんが、元の値はまだ残っています。プロセス内の新しい値の割り当てが共有メモリオブジェクトに固執していないようです。おそらく、私の問題を引き起こしていることを私が理解していない(例えば、参照渡しと値渡し)ということが起こっているのでしょうか?Pythonマルチプロセッシング:共有メモリ(numpy)配列が期待どおりに変更されていない

私は多くのワーカープロセスを作成し、JoinableQueueをインスタンス化するProcessorクラスを持っています。各プロセスによって呼び出される関数は、2次元共有メモリ配列のインデックス付きスライス上で動作し、それらの配列値を適切に更新するので、入力(共有メモリ)配列はすべての値を計算の結果に置き換える必要があります。結果の2番目の配列を持つ必要はありません。 main関数は、共有メモリ配列とインデックス値をcompute関数の引数として渡します。これらは、プロセスオブジェクトが作業項目を消費するキューに追加されます。以下のコードは次のとおりです。私は間違って行くよどこ

class Processor: 

    queue = None 

    def __init__(self, 
       number_of_workers=1): 

     # create a joinable queue 
     self.queue = JoinableQueue() 

     # create the processes 
     self.processes = [Process(target=self.compute) for _ in range(number_of_workers)] 
     for p in self.processes: 
      p.start() 

    def add_work_item(self, item): 

     # add the parameters list to the parameters queue 
     self.queue.put(item) 

    def compute(self): 

     while True: 

      # get a list of arguments from the queue 
      arguments = self.queue.get() 

      # if we didn't get one we keep looping 
      if arguments is None: 
       break 

      # process the arguments here 
      data = arguments[0] 
      index = arguments[1] 

      # only process non-empty grid cells, i.e. data array contains at least some non-NaN values 
      if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) or np.isnan(data[:, index]).all(): 

       pass   

      else: # we have some valid values to work with 

       logger.info('Processing latitude: {}'.format(index)) 

       # perform a fitting to gamma  
       results = do_something(data[:, index]) 

       # update the shared array 
       data[:, index] = results 

      # indicate that the task has completed 
      self.queue.task_done() 

    def terminate(self): 

     # terminate all processes 
     for p in self.processes: 
      p.terminate() 

    def wait_on_all(self): 

     #wait until queue is empty 
     self.queue.join() 

#----------------------------------------------------------------------------------------------------------------------- 
if __name__ == '__main__': 

    try: 

     # log some timing info, used later for elapsed time 
     start_datetime = datetime.now() 
     logger.info("Start time: {}".format(start_datetime, '%x')) 

     # get the command line arguments 
     input_file = sys.argv[1] 
     input_var_name = sys.argv[2] 
     output_file_base = sys.argv[3] 
     month_scale = int(sys.argv[4]) 

     # create the variable name from the indicator, distribution, and month scale 
     variable_name = 'spi_gamma_{}'.format(str(month_scale).zfill(2)) 

     # open the NetCDF files 
     with netCDF4.Dataset(input_file) as input_dataset, \ 
      netCDF4.Dataset(output_file_base + '_' + variable_name + '.nc', 'w') as output_dataset: 

      # read info from the input dataset and initialize the output for writing 

      # create a processor with a number of worker processes 
      number_of_workers = 1 
      processor = Processor(number_of_workers) 

      # for each longitude slice 
      for lon_index in range(lon_size): 

       logger.info('\n\nProcessing longitude: {}\n'.format(lon_index)) 

       # read the longitude slice into a data array  
       longitude_slice = input_dataset.variables[input_var_name][:, lon_index, :] 

       # reshape into a 1-D array 
       original_shape = longitude_slice.shape 
       flat_longitude_slice = longitude_slice.flatten() 

       # convert the array onto a shared memory array which can be accessed from within another process 
       shared_array_base = Array(ctypes.c_double, flat_longitude_slice) 
       shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) 
       shared_array = shared_array.reshape(original_shape) 

       # loop over each latitude point in the longitude slice 
       for lat_index in range(lat_size): 

        # have the processor process the shared array at this index 
        arguments = [shared_array, lat_index] 
        processor.add_work_item(arguments) 

       # join to the processor and don't continue until all processes have completed 
       processor.wait_on_all() 


       # write the fitted longitude slice values into the output NetCDF 
       output_dataset.variables[variable_name][:, lon_index, :] = np.reshape(shared_array, (time_size, 1, lat_size)) 

      # all processes have completed 
      processor.terminate() 

    except Exception, e: 
     logger.error('Failed to complete', exc_info=True) 
     raise 

誰もが見ることができ、私は期待していて、すなわち、なぜ共有メモリ配列の値が更新されていませんか?

ご意見やご提案ありがとうございます。

UPDATE:私は、この単一のプロセスのために今働いているが、私は、複数のプロセスを起動しようとすると、私はピクルスのエラーを取得:

pickle.PicklingError: Can't pickle '_subprocess_handle' object: <_subprocess_handle object at 0x00000000021CF9F0> 

第二のプロセスが開始されたときにこれは、プロセッサ内、発生。 ()ファンクションです。 1つのプロセス(number_of_workers = 1)を使用して以下のコードを実行すると、このエラーが発生せず、コードが複数のプロセッサを使用していなくても期待通りに実行されます。それはまだ予期しない動作を展示しているが、私は今、成功したソリューションを実装している

class Processor: 

    queue = None 

    def __init__(self, 
       shared_array, 
       data_shape, 
       number_of_workers=1): 

     # create a joinable queue 
     self.queue = JoinableQueue() 

     # keep reference to shared memory array 
     self.shared_array = shared_array 
     self.data_shape = data_shape 

     # create the processes 
     self.processes = [Process(target=self.compute_indicator) for _ in range(number_of_workers)] 
     for p in self.processes: 
      p.start() 

    def add_work_item(self, item): 

     # add the parameters list to the parameters queue 
     self.queue.put(item) 

    def compute_indicator(self): 

     while True: 

      # get a list of arguments from the queue 
      arguments = self.queue.get() 

      # if we didn't get one we keep looping 
      if arguments is None: 
       break 

      # process the arguments here 
      index = arguments[0] 

      # turn the shared array into a numpy array  
      data = np.ctypeslib.as_array(self.shared_array) 
      data = data.reshape(self.data_shape) 

      # only process non-empty grid cells, i.e. data array contains at least some non-NaN values 
      if (isinstance(data[:, index], np.ma.MaskedArray) and data[:, index].mask.all()) \ 
       or np.isnan(data[:, index]).all() or (data[:, index] < 0).all(): 

       pass   

      else: # we have some valid values to work with 

       logger.info('Processing latitude: {}'.format(index)) 

       # perform computation  
       fitted_values = do_something(data[:, index]) 

       # update the shared array 
       data[:, index] = fitted_values 

      # indicate that the task has completed 
      self.queue.task_done() 

    def terminate(self): 

     # terminate all processes 
     for p in self.processes: 
      p.terminate() 

    def wait_on_all(self): 

     #wait until queue is empty 
     self.queue.join() 

#----------------------------------------------------------------------------------------------------------------------- 
if __name__ == '__main__': 

      # create a shared memory array which can be accessed from within another process 
      shared_array_base = Array(ctypes.c_double, time_size * lat_size, lock=False) 

      # create a processor with a number of worker processes 
      number_of_workers = 4 
      data_shape = (time_size, lat_size) 
      processor = Processor(shared_array_base, data_shape, number_of_workers) 

      # for each longitude slice 
      for lon_index in range(lon_size): 

       logger.info('\n\nProcessing longitude: {}\n'.format(lon_index)) 

       # get the shared memory array and convert into a numpy array with proper dimensions 
       longitude_array = np.ctypeslib.as_array(shared_array_base) 
       longitude_array = np.reshape(longitude_array, data_shape) 

       # read the longitude slice into the shared memory array  
       longitude_array[:] = input_dataset.variables[input_var_name][:, lon_index, :] 

       # a list of arguments we'll map to the processes of the pool 
       arguments_iterable = [] 

       # loop over each latitude point in the longitude slice 
       for lat_index in range(lat_size): 

        # have the processor process the shared array at this index 
        processor.add_work_item([lat_index]) 

       # join to the processor and don't continue until all processes have completed 
       processor.wait_on_all() 

       # get the longitude slice of fitted values from the shared memory array and convert 
       # into a numpy array with proper dimensions which we can then use to write to NetCDF 
       fitted_array = np.ctypeslib.as_array(shared_array_base) 
       fitted_array = np.reshape(fitted_array, (time_size, 1, lat_size)) 

       # write the longitude slice of computed values into the output NetCDF 
       output_dataset.variables[variable_name][:, lon_index, :] = fitted_array 

      # all processes have completed 
      processor.terminate() 
+1

自分で書くのではなく、 'multiprocessing.Pool'の' map() 'メソッドを使ってみませんか? –

+1

ちょっと考えましたが、共有配列を作成する前に 'Process'-es *を作成しています。 'Process'-esが実際に共有配列にアクセスできることは確かですか?彼らはただコピーを得ることができるかもしれません... –

+1

'shared_array_base'は、ターゲット' compute'メソッドへの引数として渡されるべきです。実際には、POSIXシステムでは 'fork'を介して継承する必要がありますが、Windowsサポートでは、' mmap'共有メモリの名前と状態を子プロセスにピクルさせパイプすることができるようにする必要があります。各ワーカープロセスで共有配列をNumPy配列としてラップすることができます。あなたが現在やっているように、各作業員にコピーをピケッとパイプしないでください。 – eryksun

答えて

0

:1)それはWindows環境上のすべてのCPU上で実行されていないが、プロセスの合計経過時間にはより高速なシングルプロセッサジョブを実行するよりもあります(すなわち、マルチプロセッシング。*用途)のない同じコード)、2)Linux環境(仮想コンテナ)上でコードを実行するとき、私は利用されている4つのCPUのうちの1つを見るだけです。いずれにしても、私は現在、共有メモリ配列を使用する作業コードを持っています。これは、元の質問についてです。誰かが私が間違っているところを見て、上記の2つの問題につながっているのであれば、コメントにフォローアップしてください。

def compute(args): 

    # extract the arguments 
    lon_index = args[0] 
    lat_index = args[1] 

    # NOT SHOWN 
    # get the data_shape value 

    # turn the shared array into a numpy array 
    data = np.ctypeslib.as_array(shared_array) 
    data = data.reshape(data_shape) 

    # perform the computation, update the indexed array slice 
    data[:, lon_index, lat_index] = perform_computation(data[:, lon_index, lat_index]) 

def init_process(array): 

    # put the arguments to the global namespace 
    global shared_array 
    shared_array = array 


if __name__ == '__main__': 

    # NOT SHOWN 
    # get the lat_size, lon_size, time_size, lon_stride, and data_shape values 

    # create a shared memory array which can be accessed from within another process 
    shared_array = Array(ctypes.c_double, time_size * lon_stride * lat_size, lock=False) 
    data_shape = (time_size, lon_stride, lat_size) 

    # create a processor with a number of worker processes 
    number_of_workers = cpu_count() 

    # create a Pool, essentially forking with copies of the shared array going to each pooled/forked process 
    pool = Pool(processes=number_of_workers, 
       initializer=init_process, 
       initargs=(shared_array)) 

    # for each slice 
    for lon_index in range(0, lon_size, lon_stride): 

     # convert the shared memory array into a numpy array with proper dimensions 
     slice_array = np.ctypeslib.as_array(shared_array) 
     slice_array = np.reshape(slice_array, data_shape) 

     # read the longitude slice into the shared memory array  
     slice_array[:] = read_data(data_shape) 

     # a list of arguments we'll map to the processes of the pool 
     arguments_iterable = [] 

     # loop over each latitude point in the longitude slice 
     for lat_index in range(lat_size): 

      for i in range(lon_stride): 

       # have the processor process the shared array at this index 
       arguments = [i, lat_index] 
       arguments_iterable.append(arguments) 

       # map the arguments iterable to the compute function 
       pool.map(compute, arguments_iterable) 

       # get the longitude slice of fitted values from the shared memory array and convert 
       # into a numpy array with proper dimensions which we can then use to write to NetCDF 
       fitted_array = np.ctypeslib.as_array(shared_array) 
       fitted_array = np.reshape(fitted_array, (time_size, lon_stride, lat_size)) 

       # NOT SHOWN 
       # write the slice of computed values to file 

      # all processes have completed, close the pool 
      pool.close() 
関連する問題