0

私は複雑なUDFを書く必要があります。そこでは、別のテーブルとの結合を行い、一致数を返します。実際のユースケースははるかに複雑ですが、ここではケースを最小限の再現可能なコードに単純化しました。ここにUDFコードがあります。Spark:UDFまたはマップ関数内で結合する

def predict_id(date,zip): 
    filtered_ids = contest_savm.where((F.col('postal_code')==zip) & (F.col('start_date')>=date)) 
    return filtered_ids.count() 

私は以下のコードを使用してUDFを定義するとき、私はコンソールエラーの長いリストを取得:

predict_id_udf = F.udf(predict_id,types.IntegerType()) 

のエラーの最後の行は次のとおりです。

py4j.Py4JException: Method __getnewargs__([]) does not exist 

私が欲しいです何が最善の方法であるかを知ることができます。また、同様の最終エラーになった

result_rdd = df.select("party_id").rdd\ 
    .map(lambda x: predict_id(x[0],x[1]))\ 
    .distinct() 

:私はまた、このようなmapを試してみました。とにかく、元のデータフレームの各行に対して、UDFまたはマップ関数内で結合を行うことができます。

答えて

0

私は複雑なUDFを書く必要があります。そこでは、別のテーブルとの結合を行い、一致数を返します。

これは設計上可能ではありません。私は高レベルのDF/RDDオペレータを使用しなければならないこのような効果を達成したい:

df.join(ontest_savm, 
    (F.col('postal_code')==df["zip"]) & (F.col('start_date') >= df["date"]) 
).groupBy(*df.columns).count() 
関連する問題