2017-07-21 6 views
0

例外があります。すべての私は、プール・カウントにPythonマルチプロセッシング:プールサイズを大きくした後の壊れたパイプ例外

コード

def parse(url): 
    r = request.get(url) 
POOL_COUNT = 75 
with Pool(POOL_COUNT) as p: 
    result = p.map(parse, links) 



File "/usr/lib64/python3.5/multiprocessing/pool.py", line 130, in worker 
    put((job, i, (False, wrapped))) 
    File "/usr/lib64/python3.5/multiprocessing/queues.py", line 355, in put 
    self._writer.send_bytes(obj) 
    File "/usr/lib64/python3.5/multiprocessing/connection.py", line 200, in send_bytes 
    self._send_bytes(m[offset:offset + size]) 
    File "/usr/lib64/python3.5/multiprocessing/connection.py", line 404, in _send_bytes 
    self._send(header + buf) 
    File "/usr/lib64/python3.5/multiprocessing/connection.py", line 368, in _send 
    n = write(self._handle, buf) 
BrokenPipeError: [Errno 32] Broken pipe 
Process ForkPoolWorker-26: 
Traceback (most recent call last): 
    File "/usr/lib64/python3.5/multiprocessing/pool.py", line 125, in worker 
    put((job, i, result)) 
    File "/usr/lib64/python3.5/multiprocessing/queues.py", line 355, in put 
    self._writer.send_bytes(obj) 
    File "/usr/lib64/python3.5/multiprocessing/connection.py", line 200, in send_bytes 
    self._send_bytes(m[offset:offset + size]) 
    File "/usr/lib64/python3.5/multiprocessing/connection.py", line 404, in _send_bytes 
    self._send(header + buf) 
    File "/usr/lib64/python3.5/multiprocessing/connection.py", line 368, in _send 
    n = write(self._handle, buf) 
BrokenPipeError: [Errno 32] Broken pipe 
+0

私は試して複製するのに十分なコードを投稿できますか? –

+0

インスタンス化されたプールの数を変更しますか?説明してください! 'map_async'を使うと、使用しているprocを数えずにプールを使うことができます:[例](https://stackoverflow.com/questions/4413821/multiprocessing-pool-example#4415314) – mquantin

+0

@ XingzhouLiu質問は更新されました – Volatil3

答えて

0

を増加させたことを、あなたのコードのこの単純なバージョンでは、それはしないPOOL_COUNT

、任意の数の
from multiprocessing import Pool 
def parse(url): 
    r = url 
    print(r) 

POOL_COUNT = 90 
with Pool(processes=POOL_COUNT) as p: 
    links = [str(i) for i in range(POOL_COUNT)] 
    result = p.map(parse, links) 

で、ここで完璧に動作したのですか? 問題はrequestの部分にあるはずです。おそらくsleepが必要ですか?

+0

すでに20 +秒の睡眠と同様に、コードにはDbのやりとりも含まれています..パイプの破損はプール数とは関係ありません。 – Volatil3

0

私は壊れたパイプ例外も見ていました。しかし、私はもっと複雑です。

プールサイズを増やすだけでは例外が発生する理由の1つは、リクエストモジュールにあまりにも多くのものがあるため、メモリが不足することがあるからです。それからあなたは小さなスワップを持っている特にseg-faultになります。

Edit1:メモリ使用量に起因すると思います。プール接続が多すぎるとメモリが多すぎるため、最終的に壊れてしまいます。私は小さなRAMと大きなパッケージを持っているので、私は自分のプールのサイズを4に制限しました。

0

私は、あなたがrequests libraryを使用していると仮定すると、((あなたが要求に.getでsを逃したことに注意)次のスクリプトをAWS上で(あなたが説明するように2ギガバイトRAM)t2.smallインスタンスを再現してみましたそしてまたreturn)が欠落していた:あなたがやったよう

from multiprocessing import Pool 
import requests 
def parse(url): 
    a = requests.get(url) 
    if a.status_code != 200: 
    print(a) 
    return a.text 
POOL_COUNT = 120 
links = ['http://example.org/' for i in range(1000)] 
with Pool(POOL_COUNT) as p: 
    result = p.map(parse, links) 
print(result) 

悲しいことに、私は同じ問題に遭遇しませんでした。

あなたが掲示したスタックトレースから、問題はparse機能を起動することにあり、要求モジュール自体ではないようです。メインプロセスは、起動したプロセスの1つにデータを送信できないようです。

とにかく:この操作はCPUバインドではなく、ボトルネックはネットワーク(おそらくリモートサーバーの最大接続数、または多分おそらく)です。マルチスレッドを使用するほうがはるかに優れています。 multiprocessing.mapはプロセス間で通信する必要があるため、parseの戻り値をpickleしてメインプロセスに送信する必要があるため、これはおそらく高速です。

プロセスの代わりにスレッドを試すには、from multiprocessing.pool import ThreadPoolを実行し、PoolThreadPoolに置き換えます。

関連する問題