2012-02-12 5 views
5

数日前にスレッドSO私は複数のHTTPを構造化するためのパラダイムをデザインする手助けについてここでスループットの違いコルーチンを使用して対私は上の質問をした

を要求したシナリオです。私は、マルチプロデューサー、マルチコンシューマーシステムを持っていたいと思います。私のプロデューサーはいくつかのサイトをクロールして掻き集め、見つかったリンクをキューに追加します。私は複数のサイトをクロールするので、私は複数のプロデューサー/クローラーを持っていたいと思います。

コンシューマ/ワーカーはこのキューをフィードフォワードし、これらのリンクにTCP/UDP要求を行い、その結果を私のDjango DBに保存します。また、各キュー項目が互いに完全に独立しているため、複数の作業者が必要です。

人々には、このために、すなわちGeventまたはEventletにコルーチンライブラリを使用することをお勧めします。コルーチンを使ったことは一度もありませんでしたが、プログラミングパラダイムはスレッド型のパラダイムに似ていますが、スレッドは1つだけが積極的に実行されていますが、I/Oコールなどブロックコールが発生するとスタックはメモリ内で、スレッドは何らかの種類のI/O呼び出しを検出するまで引き継ぎます。うまくいけば、私はこの権利を持っていますか?ここに私のSOの記事の1からのコードがあります:sleep呼び出しが呼び出しをブロックしたときにsleepイベントが発生しているので

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 


def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 


def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 


for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

# This doesn't work. 
for j in range(2): 
    producers.append(gevent.spawn(producer)) 

# Uncommenting this makes this script work. 
# producer() 

q.join() 

これはうまく機能し、他の緑のスレッドが処理を引き継ぎます。これは逐次実行よりもはるかに高速です。 ご覧のとおり、私のプログラムには意図的に別のスレッドへのスレッドの実行をもたらすコードはありません。私はすべてのスレッドを同時に実行させたいので、これが上記のシナリオにどのように適合するのか見当たりません。

すべてが正常に動作しますが、私はGevent/Eventletsがオリジナル順次実行中のプログラムよりも高いが、実際のスレッドを使用して達成することができるものよりも大幅に低い使用して達成したスループットを感じます。

私は生産者と消費者のそれぞれが同時にコルーチンのような内外のスタックを交換することなく作業することができ、スレッド化メカニズムを使用して再実装私のプログラムにした場合。

これはスレッドを使用して再実装する必要がありますか?私のデザインは間違っていますか?私はコルーチンを使うことの本当のメリットを見逃しました。

多分私のコンセプトは少し濁っているかもしれませんが、これは私が同化したものです。私のパラダイムと概念の助けや明確化はすばらしいことでしょう。あなたは非常に多くの(緑)のスレッドを持っているとき

おかげ

+0

なぜ複数のプロセスを使用しないのですか? –

+0

マルチスレッドとマルチ処理の長所と短所が分からないので、大丈夫かどうかわかりません。 –

+1

グローバルインタプリタロックのために、C拡張(または重量のあるOSプロセス)に頼ることなく、Pythonプログラムで「本当のスレッディング」(実際のOSスレッドは常に1つだけ実行されます)はありません。 –

答えて

5

あなたが見ることができるように、私は意図的 が別のスレッドに1つのスレッドの実行を生み出す私のプログラムで任意のコードを持っていません。私はすべて のスレッドを同時に実行させたいので、これは上記のシナリオにどのように適合するのかを見逃しています。

単一のOSスレッドがありますが、いくつかのグリーンレットがあります。あなたのケースでは、gevent.sleep()は、ワーカーが同時に実行できるようにします。 urllib2.urlopen(url).read()などのIOコールをブロックする場合は、urllib2パッチを適用してgeventgevent.monkey.patch_*()と呼ぶ)を使用する場合と同じ処理を行います。

A Curious Course on Coroutines and Concurrencyも参照して、コードが単一のスレッド環境でどのように同時に動作するかを理解してください。

は、あなたがコードを書くことができ、マルチプロセッシング、gevent、スレッド間のスループットの違いを比較するために、すべてのaproachesと互換性のこと:

#!/usr/bin/env python 
concurrency_impl = 'gevent' # single process, single thread 
##concurrency_impl = 'threading' # single process, multiple threads 
##concurrency_impl = 'multiprocessing' # multiple processes 

if concurrency_impl == 'gevent': 
    import gevent.monkey; gevent.monkey.patch_all() 

import logging 
import time 
import random 
from itertools import count, islice 

info = logging.info 

if concurrency_impl in ['gevent', 'threading']: 
    from Queue import Queue as JoinableQueue 
    from threading import Thread 
if concurrency_impl == 'multiprocessing': 
    from multiprocessing import Process as Thread, JoinableQueue 

スクリプトの残りの部分は、すべての同時実行の実装のための同じである:

def do_work(wid, value): 
    time.sleep(random.randint(0,2)) 
    info("%d Task %s done" % (wid, value)) 

def worker(wid, q): 
    while True: 
     item = q.get() 
     try: 
      info("%d Got item %s" % (wid, item)) 
      do_work(wid, item) 
     finally: 
      q.task_done() 
      info("%d Done item %s" % (wid, item)) 

def producer(pid, q): 
    for item in iter(lambda: random.randint(1, 11), 10): 
     time.sleep(.1) # simulate a green blocking call that yields control 
     info("%d Added item %s" % (pid, item)) 
     q.put(item) 
    info("%d Signal Received" % (pid,)) 

モジュールレベルでコードを実行しないでください。main()に入れてください:

def main(): 
    logging.basicConfig(level=logging.INFO, 
         format="%(asctime)s %(process)d %(message)s") 

    q = JoinableQueue() 
    it = count(1) 
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)] 
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)] 
    for t in producers+workers: 
     t.daemon = True 
     t.start() 

    for t in producers: t.join() # put items in the queue 
    q.join() # wait while it is empty 
    # exit main thread (daemon workers die at this point) 

if __name__=="__main__":  
    main() 
+0

こんにちはSebastian、私は自分のコードを見て、私の生産者と消費者が同時に働いているのを見ました。私の小児血小板の1つにおいてブロッキング操作が起こると、他の小児血糖コントロールを生じる。私はソケットモジュールがブロックされていないように、 'monkey_patch'呼び出しを追加しましたが、私はプロセッサーを十分に処理できません。通常のPCには、より多くの同時接続とより多くのグリーンレットを持つのに十分なジュースがありますが、十分な速度が得られません。私は非常に紛失し、なぜそれがより多くのプロセッサを使用せず、より速く動作するのかと混同します。お願いします。私はとても失われています。ありがとう。 –

+0

@Mridang Agarwalla:質問に投稿したコードにコメントしました。 「生産者」*はそれと同時に働かない。 – jfs

+1

@Mridang Agarwalla:あなたの問題がIOバウンド(ディスク、ネットワーク)の場合、CPUがどれほど高速かどうかは関係ありません。たとえば、ディスクに50MB /秒しか書き込めない場合は、CPUがプロセス1GB /秒。また、あなたのプログラムは、開いているファイルの数などの他の有限のリソースを消費することができます。 'gevent'を使用すると、すべてのブロッキング呼び出しが"緑色 "であることを確認します。つまり、データベースドライバが' gevent'と互換性がないなど、ブロックされません。 – jfs

1

geventは素晴らしいです。私は何千というものでそれをテストし、それは非常にうまくいった。スクラップとdbへの保存の両方に使用するライブラリがすべて緑色になっていることを確認してください。彼らがPythonのソケットを使用している場合afaik、gevent注入は動作する必要があります。 Cで書かれた拡張機能(例えばmysqldb)はブロックされ、代わりに緑色の等価物を使う必要があります。

geventを使用すると、ほとんどすべてのタスクで新しい(緑色の)スレッドが生成され、スレッドはdb.save(web.get(address))という単純なコードになります。 DBまたはWebブロックのライブラリがある場合、geventはプリエンプションを処理します。それはあなたの仕事が記憶に収まる限り働きます。この場合

0

、あなたの問題は、プログラム速度(geventまたはスレッドのすなわち選択)が、ネットワークIOのスループットではありません。これは、プログラムの実行速度を決定するボトルネックです(そうすべきです)。

Geventは、であり、プログラムのアーキテクチャではないことを確認する良い方法です。

これは、あなたがしたいと思うプロセスの一種である:

import gevent 
from gevent.queue import Queue, JoinableQueue 
from gevent.monkey import patch_all 


patch_all() # Patch urllib2, etc 


def worker(work_queue, output_queue): 
    for work_unit in work_queue: 
     finished = do_work(work_unit) 
     output_queue.put(finished) 
     work_queue.task_done() 


def producer(input_queue, work_queue): 
    for url in input_queue: 
     url_list = crawl(url) 
     for work in url_list: 
      work_queue.put(work) 
     input_queue.task_done() 


def do_work(work): 
    gevent.sleep(0) # Actually proces link here 
    return work 


def crawl(url): 
    gevent.sleep(0) 
    return list(url) # Actually process url here 

input = JoinableQueue() 
work = JoinableQueue() 
output = Queue() 

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)] 
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)] 


list_of_urls = ['foo', 'bar'] 

for url in list_of_urls: 
    input.put(url) 

# Wait for input to finish processing 
input.join() 
print 'finished producing' 
# Wait for workers to finish processing work 
work.join() 
print 'finished working' 

# We now have output! 
print 'output:' 
for message in output: 
    print message 
# Or if you'd like, you could use the output as it comes! 

あなたは、私はちょうどここにいることを実証してきました、終了する入力作業キューを待つ必要はありません。

関連する問題