2013-04-03 6 views
5

ジョブのキューがあり、ワーカーはこれらのジョブを1つずつ処理します。各ジョブでは、データをフォーマットしてHTTP POSTリクエストを発行し、データをリクエストペイロードとして発行する必要があります。非同期HTTPリクエストを一度に1つずつPythonで送信するにはどうすればよいですか?

これらのHTTP POST要求を非スレッドでシングルスレッドで非同期的に発行するにはどうすればよいですか?要求からの応答は気にしません。私たちが望むのは、要求ができるだけ早く実行され、その後すぐに次の仕事に移行することだけです。

geventおよびgrequestsライブラリー(Why does gevent.spawn not execute the parameterized function until a call to Greenlet.join?参照)を使用して検討しました。

def execute_task(worker, job): 

    print "About to spawn request" 
    greenlet = gevent.spawn(requests.post, url, params=params) 

    print "Request spawned, about to call sleep" 
    gevent.sleep() 

    print "Greenlet status: ", greenlet.ready() 

最初のprint文を実行するが、第二と第三print文が印刷されない飽きないとURLがヒットされることはありません。私たちの労働者のコードは次のようになります。

これらの非同期要求をどのように実行させることができますか?

+0

[asyncore](http://docs.python.org/2/library/asyncore.html)と呼ばれる標準のライブラリがありますが、使用するケースが低すぎる可能性があります。 – lucasg

+0

私はこれに@georgeslと同意する必要があります。asyncoreは、後で開発するためにアプリケーションよりも優れた柔軟性を与えるため、移行するのに最適な場所です。また、http:// stackoverflow.com/questions/15753901/python-asyncore-client-socket-can-not-determaine-connection-status/15754244#15754244'の使い方の良いスタートと例です私の質問に対する答えを見てください)。そうでない場合は、実際に複数のプロセスで実行する必要があります。たとえ、Pythonの "サブ"ライブラリでも、マルチプロセスの要点であるパラレルを送ることができます。 – Torxed

+0

あなたのgeventコードは大丈夫ですクイックテストではうまくいくと私にはわかります;私はgevent 1.0b3を使います)。私は 'execute_task'が呼び出されるコンテキストに依存すると思います。 – robertklep

答えて

1

1)Queue.Queueオブジェクト

2作る)あなたとループなどの多くの「労働者」のスレッドを作成し、Queue.Queue

3から読み出さは)キューにジョブを養います。キュー

ワーカースレッドは、彼らがそれをファイルから行を読み取り、Queue.Queue

でそれらを置く

例を配置しているために、Queue.Queueを読み取るます

import sys 
import urllib2 
import urllib 
from Queue import Queue 
import threading 
import re 

THEEND = "TERMINATION-NOW-THE-END" 


#read from file into Queue.Queue asynchronously 
class QueueFile(threading.Thread): 
    def run(self): 
     if not(isinstance(self.myq, Queue)): 
      print "Queue not set to a Queue" 
      sys.exit(1) 
     h = open(self.f, 'r') 
     for l in h: 
      self.myq.put(l.strip()) # this will block if the queue is full 
     self.myq.put(THEEND) 

    def set_queue(self, q): 
     self.myq = q 

    def set_file(self, f): 
     self.f = f 

ワーカースレッドがそのインスタンス

の「スタート」を呼び出し、threading.Threadに基づいてオブジェクトを行かせるオブジェクトを作成するには(例のみ)

class myWorker(threading.Thread): 
    def run(self): 
     while(running):   
      try: 
       data = self.q.get() # read from fifo 

       req = urllib2.Request("http://192.168.1.10/url/path") 
       req.add_data(urllib.urlencode(data)) 
       h1 = urllib2.urlopen(req, timeout=10) 
       res = h1.read() 
       assert(len(res) > 80) 

      except urllib2.HTTPError, e: 
       print e 

      except urllib2.URLError, e: 
       print "done %d reqs " % n 
       print e 
       sys.exit() 

ようなものかもしれないもののアイデア

1

別のスレッドで実行するか、組み込みのasyncoreライブラリを使用する必要があります。 ほとんどのライブラリは、あなたが知らなくてもスレッドを利用したり、Pythonの標準的な部分であるasyncoreに依存します。ここで

は、スレッドとasyncoreの組み合わせです:

#!/usr/bin/python 
# -*- coding: iso-8859-15 -*- 
import asyncore, socket 
from threading import * 
from time import sleep 
from os import _exit 
from logger import * # <- Non-standard library containing a log function 
from config import * # <- Non-standard library containing settings such as "server" 

class logDispatcher(Thread, asyncore.dispatcher): 
    def __init__(self, config=None): 
     self.inbuffer = '' 
     self.buffer = '' 
     self.lockedbuffer = False 
     self.is_writable = False 

     self.is_connected = False 

     self.exit = False 
     self.initated = False 

     asyncore.dispatcher.__init__(self) 
     Thread.__init__(self) 

     self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 
     try: 
      self.connect((server, server_port)) 
     except: 
      log('Could not connect to ' + server, 'LOG_SOCK') 
      return None 

     self.start() 

    def handle_connect_event(self): 
     self.is_connected = True 

    def handle_connect(self): 
     self.is_connected = True 
     log('Connected to ' + str(server), 'LOG_SOCK') 

    def handle_close(self): 
     self.is_connected = False 
     self.close() 

    def handle_read(self): 
     data = self.recv(8192) 
     while self.lockedbuffer: 
      sleep(0.01) 

     self.inbuffer += data 


    def handle_write(self): 
     while self.is_writable: 
      sent = self.send(self.buffer) 
      sleep(1) 

      self.buffer = self.buffer[sent:] 
      if len(self.buffer) <= 0: 
       self.is_writable = False 
      sleep(0.01) 

    def _send(self, what): 
     self.buffer += what + '\r\n' 
     self.is_writable = True 

    def run(self): 
     self._send('GET/HTTP/1.1\r\n') 

while 1: 
    logDispatcher() # <- Initate one for each request. 
    asyncore.loop(0.1) 
    log('All threads are done, next loop in 10', 'CORE') 
    sleep(10) 

それとも、単に仕事をして、その後、死亡したスレッドを行うことができます。

from threading import * 
class worker(Thread): 
    def __init__(self, host, postdata) 
     Thread.__init__(self) 
     self.host = host 
     self.postdata = postdata 
     self.start() 
    def run(self): 
     sock.send(self.postdata) #Pseudo, create the socket! 

for data in postDataObjects: 
    worker('example.com', data) 

(あなたは5kの支柱の上、送信しているか、それがシステムに負担をかけてしまうかもしれません場合)は、スレッドの数を制限する必要がある場合だけwhile len(enumerate()) > 1000: sleep(0.1)を行うと、ルーパーオブジェクトは、数のスレッドを待ちましょう消える。

0

あなたのurlとparamsをリストにラップしてから、一度ペアになってタスクプールに1回ポップします(ここのタスクプールは1つのタスクを持つか空です)。スレッドを作成し、タスクプールからタスクを読み込みます。 1つのスレッドがタスクを取得してリクエストを送信し、リストから別の1つをポップアウトします(実際にはキューリスト)。

1

sleepの代わりにjoinメソッドを使用してステータスを確認することができます。問題を解決するために一度に1つずつ実行したい場合。それをテストするためにあなたのコードを少し修正することはうまくいくようです。

import gevent 
import requests 

def execute_task(worker, job): 

    print "About to spawn request" 
    greenlet = gevent.spawn(requests.get, 'http://example.com', params={}) 

    print "Request spawned, about to call sleep" 
    gevent.sleep() 

    print "Greenlet status: ", greenlet.ready() 
    print greenlet.get() 

execute_task(None, None) 

は結果を与える:

About to spawn request 
Request spawned, about to call sleep 
Greenlet status: True 
<Response [200]> 

は、より多くのこのgreenletを実行しているからGeventを遮断することができ、このPythonの過程でそこで起こっていますか?

関連する問題