2017-12-17 11 views
0

私はいくつかのタスクプロデューサ間で共有されたgevent.pool(固定サイズ)を持っています。空きスロットがある場合は、すべてのタスクプロデューサがプールに新しいGreenletを適用できます。タスクがプールに追加された後、タスクプロデューサは、追加されたすべてのタスクが完了するまで待機する必要があります。Gevent:完了したグリーンレットのセットを待つ方法

gevent.queue.JoinableQueueを使用して、すべてのタスクが完了するまで待機しようとしました。それは私が待っているの終わりに非常に迷惑な例外を得る以外は動作します。

これを避けるために、以下のコードを修正するにはどうすればよいですか? 私は何か間違っているかもしれませんか?

from gevent import monkey, sleep; monkey.patch_all() 
from gevent.queue import JoinableQueue 
from gevent.pool import Pool 

pool = Pool(3) 


def worker(n): 
    print 'Worker {} started'.format(n) 
    sleep(1) 
    print 'Worker {} finished'.format(n) 
    return n 


def main(): 
    results = [] 

    queue = JoinableQueue() 
    for job_no in range(5): 
     pool.wait_available() 
     greenlet = pool.apply_async(worker, kwds=dict(n=job_no), callback=lambda ret: results.append(ret)) 
     queue.put(greenlet) 
     sleep(.05) 
    print 'All workers added' 

    queue.join() 
    print 'All workers finished', results 


if __name__ == '__main__': 
    main() 

出力:

Worker 0 started 
Worker 1 started 
Worker 2 started 
Worker 0 finished 
Worker 3 started 
Worker 1 finished 
Worker 4 started 
All workers added 
Worker 2 finished 
Worker 3 finished 
Worker 4 finished 
Traceback (most recent call last): 
    File "main.py", line 32, in <module> 
    main() 
    File "main.py", line 27, in main 
    queue.join() 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\queue.py", line 492, in join 
    return self._cond.wait(timeout=timeout) 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 219, in wait 
    return self._wait(timeout) 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 129, in _wait 
    gotit = self._wait_core(timeout) 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 106, in _wait_core 
    result = self.hub.switch() 
    File "C:\Python\2.7.10\x64\lib\site-packages\gevent\hub.py", line 630, in switch 
    return RawGreenlet.switch(self) 
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x26c1c28 select default pending=0 ref=0>) 

答えて

1

あなたがキューにタスクを消費する何greenletがないため、すべてのgreenletsが完了するまで、queue.join()だけブロックをエラー「この操作は永遠にブロックする」取得、例外育った。

JoinableQueueここで必要とされていない、完了するすべてのgreenletsを待つgevent.joinall()を使用します。

import gevent 

def main(): 
    results = [] 
    gs = [] 
    for job_no in range(5): 
     greenlet = .. 
     gs.append(greenlet) 
    gevent.joinall(gs) 
    print 'All workers finished', results 
関連する問題