は私がpysparkデータフレーム(DF)を持っていると仮定します。Pyspark DATAFRAME選択行
-----------------------------
record_id | foo | bar
-----------------------------
1 | random text | random text
2 | random text | random text
3 | random text | random text
1 | random text | random text
2 | random text | random text
-----------------------------
私の最終目標は、.write.jdbc()
でMySQLにこれらの行を記述することで、どのIうまくいっている。しかし、これを行う前に、という新しい列をrecord_id
列の一意性に基づいて追加してください。
私は似たようなユニークなrecord_id
年代を特定する進歩の少し加えました:
df.select('record_id').distinct().rdd.map(lambda r: r[0])
をしかし、パンダのデータフレームとは違って、私はこれは私が再利用できるインデックスを持っていると信じていない、それだけに表示されます値にする。私はまだSpark/Pysparkにはかなり新しいです。
次のワークフローを試してみてください。
- 別個
record_id
と行を識別、およびMySQL - に書き込みそして、残りの行を識別し、およびMySQL
それとも新たに追加して、元のDFを変更することも可能であるに書き込みますいくつかの連鎖コマンドに基づいて列unique
?私は、MySQLの卸売に書き込むことができ、以下のようなもの:
----------------------------------
record_id | foo | bar | unique
----------------------------------
1 | random text | random text | 0
2 | random text | random text | 0
3 | random text | random text | 1 # where 1 for boolean True
1 | random text | random text | 0
2 | random text | random text | 0
----------------------------------
任意の提案やアドバイスをいただければ幸いです!
うわー、魅力的な...ありがとう! – ghukill