2017-11-09 10 views
1

は私がpysparkデータフレーム(DF)を持っていると仮定します。Pyspark DATAFRAME選択行

----------------------------- 
record_id | foo | bar 
----------------------------- 
1 | random text | random text 
2 | random text | random text 
3 | random text | random text 
1 | random text | random text 
2 | random text | random text 
----------------------------- 

私の最終目標は、.write.jdbc()でMySQLにこれらの行を記述することで、どのIうまくいっている。しかし、これを行う前に、という新しい列をrecord_id列の一意性に基づいて追加してください。

私は似たようなユニークなrecord_id年代を特定する進歩の少し加えました:

df.select('record_id').distinct().rdd.map(lambda r: r[0]) 

をしかし、パンダのデータフレームとは違って、私はこれは私が再利用できるインデックスを持っていると信じていない、それだけに表示されます値にする。私はまだSpark/Pysparkにはかなり新しいです。

次のワークフローを試してみてください。

  1. 別個record_idと行を識別、およびMySQL
  2. に書き込みそして、残りの行を識別し、およびMySQL

それとも新たに追加して、元のDFを変更することも可能であるに書き込みますいくつかの連鎖コマンドに基づいて列unique?私は、MySQLの卸売に書き込むことができ、以下のようなもの:

---------------------------------- 
record_id | foo | bar | unique 
---------------------------------- 
1 | random text | random text | 0 
2 | random text | random text | 0 
3 | random text | random text | 1 # where 1 for boolean True 
1 | random text | random text | 0 
2 | random text | random text | 0 
---------------------------------- 

任意の提案やアドバイスをいただければ幸いです!

答えて

2

あなたはRECORD_IDが一列のみを持っている場合partitionBy RECORD_IDは、ユニークとしてそれをマークした行の数を数えることができます。

from pyspark.sql.window import Window 
import pyspark.sql.functions as F 

df.withColumn("unique", (F.count("record_id").over(Window.partitionBy("record_id")) == 1).cast('integer')).show() 
+---------+-----------+-----------+------+ 
|record_id|  foo|  bar|unique| 
+---------+-----------+-----------+------+ 
|  3|random text|random text|  1| 
|  1|random text|random text|  0| 
|  1|random text|random text|  0| 
|  2|random text|random text|  0| 
|  2|random text|random text|  0| 
+---------+-----------+-----------+------+ 
+0

うわー、魅力的な...ありがとう! – ghukill

関連する問題