2017-02-22 4 views
0

におけるデータ検証効率を向上させる:は私がパンダにCSVからのデータをロードし、このような分野のいくつかの検証を行うパンダ

(1.5s) loans['net_mortgage_margin'] = loans['net_mortgage_margin'].map(lambda x: convert_to_decimal(x)) 
(1.5s) loans['current_interest_rate'] = loans['current_interest_rate'].map(lambda x: convert_to_decimal(x)) 
(1.5s) loans['net_maximum_interest_rate'] = loans['net_maximum_interest_rate'].map(lambda x: convert_to_decimal(x)) 

(48s) loans['credit_score'] = loans.apply(lambda row: get_minimum_score(row), axis=1) 
(< 1s) loans['loan_age'] = ((loans['factor_date'] - loans['first_payment_date'])/np.timedelta64(+1, 'M')).round() + 1 
(< 1s) loans['months_to_roll'] = ((loans['next_rate_change_date'] - loans['factor_date'])/np.timedelta64(+1, 'M')).round() + 1 
(34s) loans['first_payment_change_date'] = loans.apply(lambda x: validate_date(x, 'first_payment_change_date', loans.columns), axis=1) 
(37s) loans['first_rate_change_date'] = loans.apply(lambda x: validate_date(x, 'first_rate_change_date', loans.columns), axis=1) 

(39s) loans['first_payment_date'] = loans.apply(lambda x: validate_date(x, 'first_payment_date', loans.columns), axis=1) 
(39s) loans['maturity_date'] = loans.apply(lambda x: validate_date(x, 'maturity_date', loans.columns), axis=1) 
(37s) loans['next_rate_change_date'] = loans.apply(lambda x: validate_date(x, 'next_rate_change_date', loans.columns), axis=1) 
(36s) loans['first_PI_date'] = loans.apply(lambda x: validate_date(x, 'first_PI_date', loans.columns), axis=1) 

(36s) loans['servicer_name'] = loans.apply(lambda row: row['servicer_name'][:40].upper().strip(), axis=1) 
(38s) loans['state_name'] = loans.apply(lambda row: str(us.states.lookup(row['state_code'])), axis=1) 
(33s) loans['occupancy_status'] = loans.apply(lambda row: get_occupancy_type(row), axis=1) 
(37s) loans['original_interest_rate_range'] = loans.apply(lambda row: get_interest_rate_range(row, 'original'), axis=1) 
(36s) loans['current_interest_rate_range'] = loans.apply(lambda row: get_interest_rate_range(row, 'current'), axis=1) 
(33s) loans['valid_credit_score'] = loans.apply(lambda row: validate_credit_score(row), axis=1) 
(60s) loans['origination_year'] = loans['first_payment_date'].map(lambda x: x.year if x.month > 2 else x.year - 1) 
(< 1s) loans['number_of_units'] = loans['unit_count'].map(lambda x: '1' if x == 1 else '2-4') 
(32s) loans['property_type'] = loans.apply(lambda row: validate_property_type(row), axis=1) 

これらのほとんどは、いくつかは、直接変換row値を見つける機能、あります他の要素に要素を加えていますが、これらはすべてデータフレーム全体で行ごとに実行されます。このコードが書かれたとき、データフレームは十分小さく、これは問題ではありませんでした。しかし、コードは大幅に大きなテーブルを取り込むようになっており、コードのこの部分はあまりにも長い時間を要します。

これを最適化する最適な方法は何ですか?私の最初の考えは、行ごとに行かなければならなかったが、これらの関数/変換をすべて一度行に適用する(つまり、df、do func1、func2、...、func21の行に対して)。しかし、それに対処する最善の方法。ラムダが同じ結果を得るのを避ける方法はありますか?たとえば、ラムダが長い時間かかると仮定しているからです。 Python 2.7を実行することは重要です。

編集:これらの呼び出しのほとんどは、1行あたりほぼ同じ速度で実行されます(いくつかはかなり高速です)。これは277,659行のデータフレームで、サイズの点で80パーセンタイルになっています。

EDIT2:例関数の:

def validate_date(row, date_type, cols): 
    date_element = row[date_type] 
    if date_type not in cols: 
     return np.nan 
    if pd.isnull(date_element) or len(str(date_element).strip()) < 2: # can be blank, NaN, or "0" 
     return np.nan 
    if date_element.day == 1: 
     return date_element 
    else: 
     next_month = date_element + relativedelta(months=1) 
     return pd.to_datetime(dt.date(next_month.year, next_month.month, 1)) 

これは日付オブジェクト(年、月、など)から値を抽出し、最長コール(origination_year)と同様です。例えば、property_typeのようなものは、不規則な値(例えば "N/A"、 "NULL"など)をチェックするだけですが、まだそれぞれを通過するのに少し時間がかかります。

+1

'convert_to_decimal'関数が既にある場合は、' lambda x:convert_to_decimal(x) 'と書くのがちょっと混乱します。これは 'lambda yと書くのと同じです:(lambda x:convert_to_decimal(x))(y)' –

+0

質問1:**何が遅いのですか?あなたは私たちがコードを見ることができない4-5の関数を持っています。いくつかは 'convert_to_decimal'のようです。私は速く、他のものは' validate_date'のようにカラムを渡しているかもしれませんが、推測しかできない。最低でも、 'print(time.time())'コールをいくつか振りかけると便利です。 –

+0

1コールあたりの時間で編集されます。特定の呼び出しが遅いことはありません。それは合計で彼らが理想以上の時間を増やしていることです。少なくとも、すべての行を何度も何度も呼び出さなければ助けにならないように思えますが、8.5分を7.6分に置き換えているかどうかはわかりません。 – user2524282

答えて

0

td; lr:配布処理を考慮してください。改善点は、チャンク内のデータを読み込み、複数のプロセスを使用することです。ソースhttp://gouthamanbalaraman.com/blog/distributed-processing-pandas.html

import multiprocessing as mp 

def process_frame(df): len(x) 

if __name__ == "__main__": 

    reader = read_csv(csv-file, chunk_size=CHUNKSIZE) 
    pool = mp.Pool(4) # use 4 processes 

    funclist = [] 
    for df in reader: 
      # process each data frame 
      f = pool.apply_async(process_frame,[df]) 
      funclist.append(f) 

    result = 0 
    for f in funclist: 
      result += f.get(timeout=10) # timeout in 10 seconds 

    print "There are %d rows of data"%(result) 

別のオプションはGNU parallelを使用するかもしれません。 ここに別の良い例を使用していますGNU parallel

関連する問題