2016-08-02 7 views
3

のpythonパンダライブラリには、以下の機能が含まれています:パンダのvalue_counts()関数と組み合わせSpark DataframeにはPandaのマージインジケータと同等のオプションがありますか?

DataFrame.merge(right, how='inner', on=None, left_on=None, right_on=None, left_index=False, 
       right_index=False, sort=False, suffixes=('_x', '_y'), copy=True, 
       indicator=False) 

インジケータフィールドは、すぐに参加を行ってどれだけかを決定するために使用することができます。

例:

In [48]: df1 = pd.DataFrame({'col1': [0, 1], 'col_left':['a', 'b']}) 

In [49]: df2 = pd.DataFrame({'col1': [1, 2, 2],'col_right':[2, 2, 2]}) 

In [50]: pd.merge(df1, df2, on='col1', how='outer', indicator=True) 
Out[50]: 
    col1 col_left col_right  _merge 
0  0  a  NaN left_only 
1  1  b  2.0  both 
2  2  NaN  2.0 right_only 
3  2  NaN  2.0 right_only 

スパークデータフレーム内の参加のパフォーマンスをチェックするための最良の方法は何ですか?

カスタム関数は、答えの1で提供されていた:それはまだ正しい結果を得られませんが、それは希望の場合、それは素晴らしいことだ:

ASchema = StructType([StructField('id', IntegerType(),nullable=False), 
       StructField('name', StringType(),nullable=False)]) 
BSchema = StructType([StructField('id', IntegerType(),nullable=False), 
       StructField('role', StringType(),nullable=False)]) 
AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')]) 
BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')]) 
ADF = hc.createDataFrame(AData,ASchema) 
BDF = hc.createDataFrame(BData,BSchema) 
DFJOIN = ADF.join(BDF, ADF['id'] == BDF['id'], "outer") 
DFJOIN.show() 

Input: 
+----+--------+----+---------+ 
| id| name| id|  role| 
+----+--------+----+---------+ 
| 1| michel| 1| engineer| 
| 2|diederik| 2|  lead| 
| 3|  rok| 3|scientist| 
| 4| piet|null|  null| 
|null| null| 5| manager| 
+----+--------+----+---------+ 

from pyspark.sql.functions import * 
DFJOINMERGE = DFJOIN.withColumn("_merge", when(ADF["id"].isNull(), "right_only").when(BDF["id"].isNull(), "left_only").otherwise("both"))\ 
    .withColumn("id", coalesce(ADF["id"], BDF["id"]))\ 
    .drop(ADF["id"])\ 
    .drop(BDF["id"]) 
DFJOINMERGE.show() 

Output 
+---+--------+---+---------+------+ 
| id| name| id|  role|_merge| 
+---+--------+---+---------+------+ 
| 1| michel| 1| engineer| both| 
| 2|diederik| 2|  lead| both| 
| 3|  rok| 3|scientist| both| 
| 4| piet| 4|  null| both| 
| 5| null| 5| manager| both| 
+---+--------+---+---------+------+ 

==> I would expect id 4 to be left, and id 5 to be right. 

Changing join to "left": 


Input: 
+---+--------+----+---------+ 
| id| name| id|  role| 
+---+--------+----+---------+ 
| 1| michel| 1| engineer| 
| 2|diederik| 2|  lead| 
| 3|  rok| 3|scientist| 
| 4| piet|null|  null| 
+---+--------+----+---------+ 

Output 
+---+--------+---+---------+------+ 
| id| name| id|  role|_merge| 
+---+--------+---+---------+------+ 
| 1| michel| 1| engineer| both| 
| 2|diederik| 2|  lead| both| 
| 3|  rok| 3|scientist| both| 
| 4| piet| 4|  null| both| 
+---+--------+---+---------+------+ 

答えて

3

はこれを試してみてください:

>>> from pyspark.sql.functions import * 
>>> sdf1 = sqlContext.createDataFrame(df1) 
>>> sdf2 = sqlContext.createDataFrame(df2) 
>>> sdf = sdf1.join(sdf2, sdf1["col1"] == sdf2["col1"], "outer") 
>>> sdf.withColumn("_merge", when(sdf1["col1"].isNull(), "right_only").when(sdf2["col1"].isNull(), "left_only").otherwise("both"))\ 
... .withColumn("col1", coalesce(sdf1["col1"], sdf2["col1"]))\ 
... .drop(sdf1["col1"])\ 
... .drop(sdf2["col1"]) 
+0

ありがとう。私はそれをテストし、それは毎回同じように、私は質問にテストコードを入れます。 – mnos

+0

問題を再現できません。どちらの例も私にとってうまくいく。 –

+0

興味深いのは、どのsparkとpythonのバージョンを使用していますか? Jupiterのノートブックを使用してSpark 1.6とPython 2,7を使用します。まったく同じサンプルコードを実行すると、バージョンの問題が発生する可能性があります。 – mnos

4

変更されましたLostInOverflowの答えは、この作業を得ました:

from pyspark.sql import Row 

ASchema = StructType([StructField('ida', IntegerType(),nullable=False), 
       StructField('name', StringType(),nullable=False)]) 
BSchema = StructType([StructField('idb', IntegerType(),nullable=False), 
       StructField('role', StringType(),nullable=False)]) 
AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')]) 
BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')]) 
ADF = hc.createDataFrame(AData,ASchema) 
BDF = hc.createDataFrame(BData,BSchema) 
DFJOIN = ADF.join(BDF, ADF['ida'] == BDF['idb'], "outer") 
DFJOIN.show() 


+----+--------+----+---------+ 
| ida| name| idb|  role| 
+----+--------+----+---------+ 
| 1| michel| 1| engineer| 
| 2|diederik| 2|  lead| 
| 3|  rok| 3|scientist| 
| 4| piet|null|  null| 
|null| null| 5| manager| 
+----+--------+----+---------+ 

from pyspark.sql.functions import * 
DFJOINMERGE = DFJOIN.withColumn("_merge", when(DFJOIN["ida"].isNull(), "right_only").when(DFJOIN["idb"].isNull(), "left_only").otherwise("both"))\ 
    .withColumn("id", coalesce(ADF["ida"], BDF["idb"]))\ 
    .drop(DFJOIN["ida"])\ 
    .drop(DFJOIN["idb"]) 
#DFJOINMERGE.show() 
DFJOINMERGE.groupBy("_merge").count().show() 

+----------+-----+ 
| _merge|count| 
+----------+-----+ 
|right_only| 1| 
| left_only| 1| 
|  both| 3| 
+----------+-----+ 
関連する問題