2016-12-12 19 views
2

MongoDBでたくさんの挿入と更新が必要です。Pymongoマルチプロセッシング

私はこれらのタスクを行うためにマルチプロセッシングをテストしようとしています。このため私はこの簡単なコードを作成しました。私のダミーデータは次のとおりです。

documents = [{"a number": i} for i in range(1000000)] 

マルチプロセッシングなし:

time1s = time.time() 
client = MongoClient() 
db = client.mydb 
col = db.mycol 
for doc in documents: 
    col.insert_one(doc) 
time1f = time.time() 
print(time1f-time1s) 

私は150秒を得ました。

マルチプロセッシングでは、必要に応じてPymongo's FAQsに記述されている以下のワーカー関数を定義しました。私は自分のコードを実行するとき

def insert_doc(document): 
    client = MongoClient() 
    db = client.mydb 
    col = db.mycol 
    col.insert_one(document) 

しかし、:

pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno 99] Cannot assign requested address

26447のドキュメントの合計が、エラーが発生した前処理した:

time2s = time.time() 
pool = mp.Pool(processes=16) 
pool.map(insert_doc, documents) 
pool.close() 
pool.join() 
time2f = time.time() 
print(time2f - time2s) 

を私はエラーを取得します。このエラーはhereで説明されていますが、そのエラーに遭遇した人はマルチプロセッシングを使用しませんでした。解決策はただ1つのMongoClientを開くことでしたが、これはマルチプロセッシングをしたいときは不可能です。回避策はありますか?ご協力いただきありがとうございます。

答えて

4

あなたのコードでは、あなたの例の100万件のドキュメントごとに新しいMongoClientが作成されます(リンク先の質問と同じです)。これには、新しいクエリごとに新しいソケットを開く必要があります。これはPyMongoの接続プーリングを無効にしますが、極端に遅いだけでなく、TCPスタックよりも速くソケットを開閉することもできます。TIME_WAIT状態でソケットをあまりにも多く残してしまい、最終的にポートが使い果たされます。

あなたが各クライアントとの多数の文書を挿入する場合は、少数のクライアントを作成するため、少数のソケットを開くことができます。

import multiprocessing as mp 
import time 
from pymongo import MongoClient 

documents = [{"a number": i} for i in range(1000000)] 

def insert_doc(chunk): 
    client = MongoClient() 
    db = client.mydb 
    col = db.mycol 
    col.insert_many(chunk) 

chunk_size = 10000 

def chunks(sequence): 
    # Chunks of 1000 documents at a time. 
    for j in range(0, len(sequence), chunk_size): 
     yield sequence[j:j + chunk_size] 

time2s = time.time() 
pool = mp.Pool(processes=16) 
pool.map(insert_doc, chunks(documents)) 
pool.close() 
pool.join() 
time2f = time.time() 
print(time2f - time2s) 
+0

を** 'col.insert_many()'ここではトリックです**場合。 'chunk'を繰り返し実行し、' col.insert() 'を使うとtcp_socketを使い果たし、' _get_socket_no_auth:sock_info、from_pool = self.sockets.popの '空のセットから' KeyError: 'popを投げます()、True'である。他のオプションは、[Bulk Write Operations](http://api.mongodb.com/python/current/examples/bulk.html)を使用することです。 – Abhijeet