2017-09-29 6 views
1

TLに並列化するDASKを使用した; DR:私たちは、私は、一般的にプロジェクトに取り組んでいる同じHDFHDF読み取り変換-書き込み

からの読み取りと書き込みDASKとパンダのコードを並列化の問題を抱えています読み込み、翻訳(またはデータの結合)、およびこれらのデータの書き込みの3つのステップが必要です。文脈のために、私たちは医療記録を扱っています。医療記録はさまざまな形式でクレームを受け取り、それらを標準化された形式に翻訳し、ディスクに書き直します。理想的には、中間データセットをPython/Pandas経由で後からアクセスできる形式で保存したいと考えています。

現在、HDFをデータストレージ形式として選択しましたが、実行時の問題に問題があります。大規模な人口では、私のコードは現在、数日以上かかることがあります。これは私にDaskを調査させましたが、私は自分の状況にDask bestを適用したことは肯定的ではありません。

以下は、私のワークフローの実例です。うまくいけば、実行時の問題を理解するのに十分なサンプルデータがあります。

読む(この場合は作成します)データ

import pandas as pd 
import numpy as np 
import dask 
from dask import delayed 
from dask import dataframe as dd 
import random 
from datetime import timedelta 
from pandas.io.pytables import HDFStore 

member_id = range(1, 10000) 
window_start_date = pd.to_datetime('2015-01-01') 
start_date_col = [window_start_date + timedelta(days=random.randint(0, 730)) for i in member_id] 

# Eligibility records 
eligibility = pd.DataFrame({'member_id': member_id, 
          'start_date': start_date_col}) 
eligibility['end_date'] = eligibility['start_date'] + timedelta(days=365) 
eligibility['insurance_type'] = np.random.choice(['HMO', 'PPO'], len(member_id), p=[0.4, 0.6]) 
eligibility['gender'] = np.random.choice(['F', 'M'], len(member_id), p=[0.6, 0.4]) 
(eligibility.set_index('member_id') 
.to_hdf('test_data.h5', 
     key='eligibility', 
     format='table')) 

# Inpatient records 
inpatient_record_number = range(1, 20000) 
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in inpatient_record_number] 
inpatient = pd.DataFrame({'inpatient_record_number': inpatient_record_number, 
          'service_date': service_date}) 
inpatient['member_id'] = np.random.choice(list(range(1, 10000)), len(inpatient_record_number)) 
inpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(inpatient_record_number)) 
(inpatient.set_index('member_id') 
.to_hdf('test_data.h5', 
     key='inpatient', 
     format='table')) 

# Outpatient records 
outpatient_record_number = range(1, 30000) 
service_date = [window_start_date + timedelta(days=random.randint(0, 730)) for i in outpatient_record_number] 
outpatient = pd.DataFrame({'outpatient_record_number': outpatient_record_number, 
          'service_date': service_date}) 
outpatient['member_id'] = np.random.choice(range(1, 10000), len(outpatient_record_number)) 
outpatient['procedure'] = np.random.choice(['A', 'B', 'C', 'D'], len(outpatient_record_number)) 
(outpatient.set_index('member_id') 
.to_hdf('test_data.h5', 
     key='outpatient', 
     format='table')) 

翻訳/書き込みデータ

シーケンシャルアプローチ

def pull_member_data(member_i): 
    inpatient_slice = pd.read_hdf('test_data.h5', 'inpatient', where='index == "{}"'.format(member_i)) 
    outpatient_slice = pd.read_hdf('test_data.h5', 'outpatient', where='index == "{}"'.format(member_i)) 
    return inpatient_slice, outpatient_slice 


def create_visits(inpatient_slice, outpatient_slice): 
    # In reality this is more complicated, using some logic to combine inpatient/outpatient/ER into medical 'visits' 
    # But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier 
    visits_stacked = pd.concat([inpatient_slice, outpatient_slice]).reset_index().sort_values('service_date') 
    visits_stacked.insert(0, 'visit_id', range(1, len(visits_stacked) + 1)) 
    return visits_stacked 


def save_visits_to_hdf(visits_slice): 
    with HDFStore('test_data.h5', mode='a') as store: 
     store.append('visits', visits_slice) 


# Read in the data by member_id, perform some operation 
def translate_by_member(member_i): 
    inpatient_slice, outpatient_slice = pull_member_data(member_i) 
    visits_slice = create_visits(inpatient_slice, outpatient_slice) 
    save_visits_to_hdf(visits_slice) 


def run_translate_sequential(): 
    # Simple approach: Loop through each member sequentially 
    for member_i in member_id: 
     translate_by_member(member_i) 

run_translate_sequential() 

上記のコードは、へ〜9分かかります私のマシンで動かす。

DASKアプローチ

def create_visits_dask_version(visits_stacked): 
    # In reality this is more complicated, using some logic to combine inpatient/outpatient/ER 
    # But for simplicity, we'll just stack the inpatient/outpatient and assign a record identifier 
    len_of_visits = visits_stacked.shape[0] 
    visits_stacked_1 = (visits_stacked 
         .sort_values('service_date') 
         .assign(visit_id=range(1, len_of_visits + 1)) 
         .set_index('visit_id') 
         ) 
    return visits_stacked_1 


def run_translate_dask(): 
    # Approach 2: Dask, with individual writes to HDF 
    inpatient_dask = dd.read_hdf('test_data.h5', 'inpatient') 
    outpatient_dask = dd.read_hdf('test_data.h5', 'outpatient') 
    stacked = dd.concat([inpatient_dask, outpatient_dask]) 
    visits = stacked.groupby('member_id').apply(create_visits_dask_version) 
    visits.to_hdf('test_data_dask.h5', 'visits') 

run_translate_dask() 

このDASKアプローチ13秒かかり

これは大きな改善ですが、我々は一般的にいくつかのことについて興味が(!):

  1. Daskデータフレームを使用し、それらを連結し、groupbyを使用/最良のアプローチを適用するというアプローチは、この単純な例を考えればわかりますか?

  2. 実際には、同じHDFから読み込み、同じHDFに書き込むこのような複数のプロセスがあります。オリジナルのコードベースは、ワークフロー全体を一度に1つずつ実行できるように構成されていました(member_id)。それらを並列化しようとすると、小さなサンプルで作業することもありましたが、ほとんどの場合、セグメンテーション違反が発生しました。このようなワークフローを並列化することで、HDFを読み書きすることには、既知の問題はありますか?この例も作成していますが、これが提案をトリガする場合(またはこのコードが誰かに同様の問題に直面するのを助ける場合)には、ここに投稿します。

フィードバックはありません。

答えて

1

一般的に、groupby-applyはかなり遅くなります。このようなデータ、特に制限されたメモリを使用することは、一般的に困難です。

一般的に、寄木張りの形式(dask.dataframeにはto_関数とread_parquet関数があります)を使用することをお勧めします。あなたは、HDFファイルよりもsegfaultを得る可能性がはるかに低いです。

+1

ありがとうございます! HDFがおそらく動作しない恐れがあることを確認しました。Parquetを実験しようとします。助けを感謝する – zukah

関連する問題