私はpyspark Dataframeを持っていますが、今度は各行を繰り返し、mongoDBコレクションに挿入/更新したいと思います。pyspark Dataframeを反復処理し、次に各行に対してmongoDBと対話します
#Did every required imports
#dataframe
+---+----+
|age|name|
+---+----+
| 30| c|
| 5| e|
| 6| f|
+---+----+
db = mongodbclient['mydatabase']
collection = db['mycollection']
#created below function to insert/update
def customFunction(row):
key = {'name':row.name}
data = dict(zip(columns,[row.x for x in columns]))
collection.update(key, data, {upsert:true})
#return a_flag #commented it as of now, a_flag can be 0 or 1
名前がmongoDBコレクション 'mycollection'に存在する場合は、その行/レコードを更新し、そうでない場合はその新しいレコードを挿入する必要があります。
私は火花データフレームの上に
result = my_dataframe.rdd.map(customFunction)
#.....TypeError: can't pickle _thread.lock objects....
#AttributeError: 'TypeError' object has no attribute 'message'
をこの機能をマップしようとしたとき、誰もが「どこにもその機能にここに間違っている、および/または何把握していただけますかあれば提案してください、次のエラーを取得しています他の選択肢は、このタイプのタスクです。
基本的に(??コレクトコールなく、そのことも可能である)
を各行を反復し、各行の外スパーク作業を実行するための機能を適用します。
、事前に感謝を提案してください。.. :) MongoDBの
でマイデータに
name age
a 1
b 2
c 3 #new update should make age as 30 and 2 more new recs should inserted
データセット「my_dataframe」のサイズはどれくらいですか?輸出は並行して実施する必要がありますか?いくつのレコードを更新する必要があるかによって、必要なものを達成するには少なくとも3つの有効な方法があるからです。 – Mariusz
@Mariusz:mongoDBのBase_collectionには150mln +レコードがあり、spark-dataframeには500000レコード以下のインクリメンタルデータがあります。私は何が利用可能なオプションがあるか教えてください。 – Satya
@マリアス:私は一般的に、パンダのデータフレームを収集したり、それをMongoDBにアップコンバートしたりすることを好まない。 – Satya