2009-10-12 11 views
6

HTTPストリームに接続して消費するテキストデータを記録するクライアントがあります。httpストリームからの非ブロック読み取り/ログ

私はストリーミングサーバーにHTTP GETリクエストを送信します...サーバーはデータを返信し、継続的に公開します...テキストをパブリッシュするか、またはping(テキスト)メッセージを定期的に送信します。

消費するデータをノンブロッキングで読み込んでログに記録する必要があります。

私はこのような何かをしています:

import urllib2 

req = urllib2.urlopen(url)  
for dat in req: 
    with open('out.txt', 'a') as f:   
     f.write(dat) 

私の質問です:ストリームが連続しているとき
が、これは今までブロックされますか?
各チャンクでどれだけのデータが読み込まれ、それを指定/チューニングできますか?
これはHTTPストリームを読み書きする最も良い方法ですか?

答えて

3

ブロックサイズのブロックやバッファリングなどの問題を適切に管理するには、高レベルのインターフェイスを使用しています。非同期インターフェイス(すべてtwisted、既に提案されている)は、標準ライブラリにもありますが、なぜhttplibではないのですか? HTTPResponseインスタンス.read(amount)メソッドは、urlopenによって返されるオブジェクトの同様のメソッドよりも、amountバイトを読み込むために必要以上にブロックされない可能性が高くなります(どちらのモジュールでも、hmmm ...)。

6

ねえ、これは3つの質問です! ;-)

サーバーがデータを非常に迅速に生成していても、ネットワークのボトルネックが原因で読み取りがブロックされることがあります。

"for dat in req"を使用してURLデータを読み取ると、一度に1行ずつ読むことができます。画像などのバイナリデータを読み取っている場合は、あまり役に立ちません。使用する場合、より良いコントロールを得る

chunk = req.read(size) 

もちろんブロックすることができます。

それが最良の方法であるかどうかは、ご質問では利用できない詳細によって異なります。たとえば、ブロッキングコールを何も実行しないで実行する必要がある場合は、Twistedのようなフレームワークを検討する必要があります。あなたがブロックすることであなたを抑えて、Twisted(これはまったく新しいパラダイムです)を使いたくない場合は、スレッドをスピンアップして読み書きを行いますあなたのメインスレッドは、その陽気な方法になっている間のファイルは、:

明らか
def func(req): 
    #code the read from URL stream and write to file here 

... 

t = threading.Thread(target=func) 
t.start() # will execute func in a separate thread 
... 
t.join() # will wait for spawned thread to die 

、私は、エラーチェック/例外処理などを省略しましたが、うまくいけば、それはあなたの写真を与えるのに十分です。あなたはそれをブロックしますサーバーに追いつくとき、サーバはより多くのデータを生成するまで

各DATは最後に改行を含む1行になります

1

はいねじれ

良いオプションです私はあなたの例で、との周りを交換するだろう、あなたは本当に開き、到着するすべての行のファイルを閉じたいですか?

+0

for/with orderは意図的でした。これは、各書き込みでファイルハンドルをオープン/クローズします。ビジーなストリームでは効率的ではありませんが、私の場合、ストリームは主にブロック/待機して、データをログに記録することがあります。 –

3

もう1つの方法は、socketモジュールを直接使用することです。接続を確立し、HTTP要求を送信し、ソケットを非ブロッキングモードに設定して、socket.recv()(リソースが一時的に利用できない)例外(つまり、何も読み取らない)を処理してデータを読み取ります。あなたはデータがあるときは、あなたを教えてくれますselectモジュールを利用することができなどURLベースの基本認証を必要とする、Webサーバーがリダイレクト場合urllib.urlopen()はいくつかの利点を持っている、しかし

import socket, time 

BUFSIZE = 1024 

s = socket.socket() 
s.connect(('localhost', 1234)) 
s.send('GET /path HTTP/1.0\n\n') 
s.setblocking(False) 

running = True 

while running: 
    try: 
     print "Attempting to read from socket..." 
     while True: 
      data = s.recv(BUFSIZE) 
      if len(data) == 0:  # remote end closed 
       print "Remote end closed" 
       running = False 
       break 
      print "Received %d bytes: %r" % (len(data), data) 
    except socket.error, e: 
     if e[0] != 11:  # Resource temporarily unavailable 
      print e 
      raise 

    # perform other program tasks 
    print "Sleeping..." 
    time.sleep(1) 

:非常にラフな例がこれです読む。