2017-11-27 14 views
2

データフレームがあり、各行に関数を適用したいと考えています。この機能は他のデータフレームに依存します。Spark UDFへの入力としてDataFrameを渡すには?

簡略化した例。 I三個のデータフレーム以下のように持っている:私は最初の行の、すなわちdf_other_1df_other_2からfeat1feat2の一意の上部値を収集するdfの行ごと

df = sc.parallelize([ 
    ['a', 'b', 1], 
    ['c', 'd', 3] 
    ]).toDF(('feat1', 'feat2', 'value')) 

df_other_1 = sc.parallelize([ 
     ['a', 0, 1, 0.0], 
     ['a', 1, 3, 0.1], 
     ['a', 3, 10, 1.0], 
     ['c', 0, 10, 0.2], 
     ['c', 10, 25, 0.5] 
     ]).toDF(('feat1', 'lower', 'upper', 'score')) 

df_other_2 = sc.parallelize([ 
     ['b', 0, 4, 0.1], 
     ['b', 4, 20, 0.5], 
     ['b', 20, 30, 1.0], 
     ['d', 0, 5, 0.05], 
     ['d', 5, 22, 0.9] 
     ]).toDF(('feat1', 'lower', 'upper', 'score')) 

、一意の値が(1、3 、10、4、20、30)。次に、(30,20,10,4,3,1)のように並べ替えて、先頭に1つ上の数字を追加します。 dfはそうのようになる:

df = sc.parallelize([ 
     ['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1]], 
     ['c', 'd', 3, [26, 25, 22, 10, 5]] 
     ]).toDF(('feat1', 'feat2', 'value', 'lst')) 

そして、dfの行毎及びlstのそれぞれの値のそれぞれについて、Iはdf_other_1df_other_2両方からlstの各値をscoreの和を計算しますupperlowerになります。私の目標は、合計スコアがある閾値(例えば、1.4)を上回る各lstの中で最も低い値を見つけることです。ここで

は、合計スコアを計算する方法です。したがって、最初の行がdfの場合、lstの最初の値は31です。df_other_1feat1の場合、最高のバケットより上になります。したがって、スコアは1になります。df_other_2と同じです。したがって、合計スコアは1 + 1 = 2になります。値が10の場合(最初の行について)、合計スコアは1 + 0.5 = 1.5になります。

これはdfは最後に次のようになります方法です:

df = sc.parallelize([ 
      ['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1], [2.0, 2.0, 2.0, 1.5, 1.5, 1.1, 0.2], 4], 
      ['c', 'd', 3, [26, 25, 22, 10, 5], [2.0, 1.5, 1.4, 1.4, 1.1], 25] 
      ]).toDF(('feat1', 'feat2', 'value', 'lst', 'total_scores', 'target_value')) 

私は実際にこれらの目標値425を見つけるために探しています。中間ステップは本当に重要ではありません。

============================================== ============================

は、ここで私はこれまで試したものです:

def get_threshold_for_row(feat1, feat2, threshold): 

    this_df_other_1 = df_other_1.filter(col('feat1') == feat1) 
    this_df_other_2 = df_other_2.filter(col('feat1') == feat2) 

    values_feat_1 = [i[0] for i in this_df_other_1.select('upper').collect()] 
    values_feat_1.append(values_feat_1[-1] + 1) 
    values_feat_2 = [i[0] for i in this_df_other_2.select('upper').collect()] 
    values_feat_2.append(values_feat_2[-1] + 1) 

    values = values_feat_1 + values_feat_2 
    values = list(set(values)) #Keep unique values 
    values.sort(reverse=True) #Sort from largest to smallest 

    df_1_score = df_2_score = 0 
    prev_value = 10000 #Any large number 
    prev_score = 10000 

    for value in values: 
     df_1_score = get_score_for_key(this_df_other_1, 'feat_1', feat_1, value) 
     df_2_score = get_score_for_key(this_df_other_2, 'feat_1', feat_2, value) 

     total_score = df_1_score + df_2_score 

     if total_score < threshold and prev_score >= threshold: 
      return prev_value 

     prev_score = total_score 
     prev_value = value 


def is_dataframe_empty(df): 
    return len(df.take(1)) == 0 

def get_score_for_key(scores_df, grouping_key, this_id, value): 

    if is_dataframe_empty(scores_df): 
     return 0.0 

    w = Window.partitionBy([grouping_key]).orderBy(col('upper')) 

    scores_df_tmp = scores_df.withColumn("prev_value", lead(scores_df.upper).over(w))\ 
         .withColumn("is_last", when(col('prev_value').isNull(), 1).otherwise(0))\ 
         .drop('prev_value') 

    scores_df_tmp = scores_df_tmp.withColumn("next_value", lag(scores_df_tmp.upper).over(w))\ 
         .withColumn("is_first", when(col('next_value').isNull(), 1).otherwise(0))\ 
         .drop('next_value').cache() 

    grouping_key_score = scores_df_tmp.filter((col(grouping_key) == this_id) & 
           (((value >= col('from_value')) & (value < col('to_value'))) | 
           ((value >= col('to_value')) & (col('is_last') == 1)) | 
           ((value < col('from_value')) & (col('is_first') == 1)) | 
           (col('from_value').isNull()))) \ 
        .withColumn('final_score', when(value <= col('to_value'), col('score')).otherwise(1.0)) \ 
        .collect()[0]['final_score'] 

    return grouping_key_score 

df.rdd.map(lambda r: (r['feat_1'], r['feat_2'])) \ 
    .map(lambda v: (v[0], v[1], get_threshold_for_row(v[0], v[1], 1.4))) 
    .toDF() 

しかし、私はよ得る: AttributeError: 'Py4JError' object has no attribute 'message'

長いポストに申し訳ありません。何か案は?

答えて

2

私は、データフレームを持っていると私は、各行に関数を適用したいです。この機能は他のデータフレームに依存します。

tl; drこれはUDFでは不可能です。最も広い意味で

、UDFは、(列参照として)は、ゼロ又はそれ以上の列の値を受け入れる関数(実際に触媒式)です。

UDFは、UDFは、ユーザー定義集合関数(UDAF)であれば最も広い場合には全体のデータフレームとすることができるレコードで動作することができます。

UDFで複数のDataFrameを処理する場合は、DataFramesにUDF用に使用する列を含める必要があります(join)。

+1

ありがとうございました!私は「結合」を介して結果を得る方法を考え出すつもりです。 – Stergios

関連する問題