2017-09-07 2 views
0

私はpyspark Dataframeを持っていますが、今度は各行を繰り返し、mongoDBコレクションに挿入/更新したいと思います。pyspark Dataframeを反復処理し、次に各行に対してmongoDBと対話します

#Did every required imports 
#dataframe 
+---+----+ 
|age|name| 
+---+----+ 
| 30| c| 
| 5| e| 
| 6| f| 
+---+----+ 
    db = mongodbclient['mydatabase'] 
    collection = db['mycollection'] 
    #created below function to insert/update 
    def customFunction(row): 
     key = {'name':row.name} 
     data = dict(zip(columns,[row.x for x in columns])) 
     collection.update(key, data, {upsert:true}) 
     #return a_flag #commented it as of now, a_flag can be 0 or 1 

名前がmongoDBコレクション 'mycollection'に存在する場合は、その行/レコードを更新し、そうでない場合はその新しいレコードを挿入する必要があります。

私は火花データフレームの上に

result = my_dataframe.rdd.map(customFunction) 
#.....TypeError: can't pickle _thread.lock objects.... 
#AttributeError: 'TypeError' object has no attribute 'message' 

をこの機能をマップしようとしたとき、誰もが「どこにもその機能にここに間違っている、および/または何把握していただけますかあれば提案してください、次のエラーを取得しています他の選択肢は、このタイプのタスクです。

基本的に(??コレクトコールなく、そのことも可能である)

を各行を反復し、各行の外スパーク作業を実行するための機能を適用します。

、事前に感謝を提案してください。.. :) MongoDBの

マイデータに

name age 
a 1 
b 2 
c 3 #new update should make age as 30 and 2 more new recs should inserted 
+0

データセット「my_dataframe」のサイズはどれくらいですか?輸出は並行して実施する必要がありますか?いくつのレコードを更新する必要があるかによって、必要なものを達成するには少なくとも3つの有効な方法があるからです。 – Mariusz

+0

@Mariusz:mongoDBのBase_collectionには150mln +レコードがあり、spark-dataframeには500000レコード以下のインクリメンタルデータがあります。私は何が利用可能なオプションがあるか教えてください。 – Satya

+0

@マリアス:私は一般的に、パンダのデータフレームを収集したり、それをMongoDBにアップコンバートしたりすることを好まない。 – Satya

答えて

1

を接続オブジェクトを漬けすることができないように見えます。

def customFunction(rows): 
    db = mongodbclient['mydatabase'] 
    collection = db['mycollection'] 

    for row in rows: 
     key = {'name':row.name} 
     data = dict(zip(columns,[row.x for x in columns])) 
     collection.update(key, data, {upsert:true}) 

my_dataframe.rdd.foreachPartition(customFunction) 

が、致命的な障害が矛盾した状態でデータベースを残す可能性があることに注意してください:私はforeachPartitionを使用すると思います。

1

MongoDBに500KBのレコードをアップしておくと、これを処理するためには、おそらくバルクモードが有効でしょう。 mongoDB内でリクエストを実行するには、あなたが実際に(要求を作成するだけで)実行するよりもはるかに多くの電力が必要であり、これを並列に実行するとmongo側が不安定になる可能性があります(反復的なアプローチよりも遅くなります)。

次のコードを試すことができます。 collect()を使用しないので、ドライバでメモリ効率が良いです。

bulk = collection.initialize_unordered_bulk_op() 
for row in rdd.toLocalIterator(): 
    key = {'name':row.name} 
    data = dict(zip(columns,[row.x for x in columns])) 
    bulk.update(key, data, {upsert:true}) 

print(bulk.execute()) 
関連する問題