2016-11-02 9 views
5

2つのPySpark DataFrames df1df2があります。Pyspark Dataframe 2つのカラムに適用する

df1= 'a' 
     1  
     2  
     5  

df2= 'b' 
     3 
     6 

そして私は、各df1['a']最も近いdf2['b']値を見つけ、そしてdf1の新しい列として最も近い値を追加します。 、そして結果になります。つまり

df1['a']の各値xのために、私はすべてのy in df2['b'](一つだけy最小距離を達成できることがあると仮定することができます注意してください)のためmin(abx(x-y))を実現yを見つけたいです

'a' 'b' 
1  3 
2  3 
5  6 

こと私は(最小距離を達成する値を求める前に)第1の距離行列を作成するために、次のコードを試みた:

from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 

def dict(x,y): 
    return abs(x-y) 
udf_dict = udf(dict, IntegerType()) 

sql_sc = SQLContext(sc) 
udf_dict(df1.a, df2.b) 
Column<PythonUDF#dist(a,b)> 

を与えるは、それから私は、エラー/出力を与えることなく、永遠に実行されます

sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b)) 

を試してみました。

私の質問は以下のとおりです。

  1. 私がスパークする新たなんだと、効率的な出力データフレームを構築するための私の方法は何ですか? (私の方法は、すべてのabの値の最初の距離行列を作成して、minを見つける)
  2. 私のコードの最後の行とそれを修正する方法は何ですか?あなたの2番目の質問を皮切り

答えて

5

- あなただけの既存のデータフレームにUDFを適用することができ、私はあなたがこのような何かを考えていたと思う:、

>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show() 
+---+---+--------+ 
| a| b|distance| 
+---+---+--------+ 
| 1| 3|  2| 
| 1| 6|  5| 
| 2| 3|  1| 
| 2| 6|  4| 
| 5| 3|  2| 
| 5| 6|  1| 
+---+---+--------+ 

しかし、この距離を適用するためのより効率的な方法があります

>>> from pyspark.sql.functions import abs 
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b)) 

その後、あなたは計算することで一致する番号を検索することができます:内部absを使用して

関連する問題