2017-03-05 7 views
0

pysparkでpandas .apply(関数、軸= 1)(行優先関数を適用する)を実装する必要があります。私が初心者であるため、マップ関数またはUDFを使用して実装できるかどうかはわかりません。私はどこでも同じような実装を見つけることができません。pysparkのデータフレームの行単位の操作または行ごとのUDF

基本的には、関数に行を渡して、現在の行と前の行の値に依存する新しい列を作成し、変更された行を戻して新しいデータフレームを作成する操作が必要です。 PREV_COL_Aは、1行分遅れCOL_A過ぎないここ

previous = 1 
def row_operation(row): 
    global previous 
    if pd.isnull(row["PREV_COL_A"])==True or (row["COL_A"]) != (row["PREV_COL_A"]): 
     current = 1 
    elif row["COL_C"] > cutoff: 
     current = previous +1 
    elif row["COL_C"]<=cutoff: 
     current = previous 
    else: 
     current = Nan 
    previous = current 
    return current 

:パンダと一緒に使用関数の つを以下に示します。

この関数は最も簡単で行を返さないが、他の関数は返さないことに注意してください。 誰かがpysparkで行操作を実装する方法について私を導くことができれば、大きな助けになるでしょう。 TIA

答えて

0

rdd.mapPartitionを使用できます。これは、行の反復子を返し、返す結果の行を返します。あなたが与えられた反復可能性は、インデックスを前進または後退させることを許さず、単に次の行を返します。ただし、必要な処理を行う際には、行を節約することができます。例

def my_cool_function(rows): 
    prev_rows = [] 

    for row in rows: 
     # Do some processing with all the rows, and return a result 
     yield my_new_row 

     if len(prev_rows) >= 2: 
      prev_rows = prev_rows[1:] 

     prev_rows.append(row) 

updated_rdd = rdd.mapPartitions(my_cool_function) 

注のために、私は例のためのパーティションを追跡するためにリストを使用しますが、Pythonのリストは本当に効率的なヘッドプッシュ/ポップ・メソッドを持っていない配列ですので、あなたは、おそらく使用したいと思うでしょう実際のキュー

関連する問題