2017-01-31 16 views
1

特定のIOバウンドタスクを実行するスクリプトの作成に取り組んでいます。大量のデータセットをダウンロードし、データそのものを破棄する前にサイズに関する情報を記録する必要があります。並行処理を理解する

問題は、このデータを取得しているソースにcontent-lengthヘッダーがないため、ファイルサイズがどれくらい大きいかを事前に知ることができないことです。これには、ダウンロードプロセスの所要時間を監視し、そのプロセスを強制終了し、長時間(例えば60秒以上)かかる場合に他のプロセスに継続する方法があることが必要です。これは、非常に大きなデータセットに「スタック」するのを避けるために必要です。

requestsはこの機能を内蔵しておらず、ソリューションの検索に多くの時間を費やした後、pebbleライブラリを使用して同時処理をタイムアウトで実行することに決めました。私の理解は、標準のlib multiprocessingモジュールの小さな拡張機能で、いくつかの安全機能、つまりエラー処理とタイムアウト(私が望むもの)を追加したものです。 Process pool例に基づいて

は、ここに私のコードです:

try: 
    with ProcessPool(max_workers=4) as pool: 
     iterator = pool.map(get_data, process_tuples[3:6], timeout=10) 

     while True: 
      try: 
       rows, cols, filesize, i = next(iterator) 
       datasets[i]['rows'] = rows 
       datasets[i]['columns'] = cols 
       datasets[i]['filesize'] = filesize 
      except TimeoutError as error: 
       print("Function took longer than %d seconds. Skipping responsible endpoint..." % error.args[1]) 
      except StopIteration: 
       break 
finally: 
    with open("../../../data/" + FILE_SLUG + "/glossaries/geospatial.json", "w") as fp: 
     json.dump(datasets, fp, indent=4) 

しかし、これは、2つの方法で期待される動作から分岐:

  1. 私はtimeout=10は、個々の時間の量を制限することを考えていましたダウンロードプロセス(完了:get_data)がかかります。しかし、これを大きなファイルで実行すると、TimeoutErrorというメッセージが表示され、処理に30秒以上かかることがあります。 30は私の入力の3倍です。それは私が欲しいものではありません。そこには何が起こっているのですか?
  2. TimeoutErrorが呼び出されたとき、その実行を破棄して次のブロックに移動する代わりに、プロセスはfinallyブロックにジャンプします。私はこれが私の最初の質問に対する答えの結果だと思う。
+0

セット 'chunksize'を使った例だとあなたが希望'タイムアウトを取得します可能性がどちらか'の10秒。ライブラリはあなたの質問の第2のポイントをまだサポートしていません。そのようなユースケースをサポートするために、私はすぐにそのロジックを再構築します。 – noxdafox

答えて

1

実際requestsにあなたはstream=Trueを設定し、さらにワークフローを制御するためにResponse.iter_content()を使用することができます。そこで、基本的

import time 
import requests 

def get_content(url, timeout): 
    """ 
    Get response data from url before timeout 
    """ 
    start = time.time() 
    data = '' 
    response = requests.get(url, stream=True) 

    for chunk in response.iter_content(chunk_size = 1024): # You can set a bigger chunk_size for less iterations 
     if (time.time() - start) > timeout: 
      response.close() 
      return {'TimedOut': True, 'data': None} 
     else: 
      data += chunk 

    response.close() 
    return {'TimedOut': False, 'data': data} 

をあなたはtimeout値を設定し、データが大きすぎる場合、またはネットワークがある:応答データを反復/ダウンロードしながら、あなたの場合は

は、我々は、経過時間を追跡することができ遅すぎると、 timeout以上を費やすと結果が返され、それらの不完全なデータはガベージコレクションされます。それはIOバウンドのタスクなので

次に、私たちは、仕事を終えるためにthreadingまたはmultiprocessingを使用し、ここ1にthreading

import threading, Queue 

def worker(queue): 
    while not queue.empty(): 
     url = queue.get() 

     result = get_content(url, 60) 

     # Do other stuff 

if __name__ == '__main__': 
    limit = 10 # number of threads to use 
    thread_pool = [None] * limit 
    queue = Queue.Queue() 
    urls = ['xxxx', 'xxxxx'] 

    for url in urls: 
     queue.put(url) 

    for thread in thread_pool: 
     thread = threading.Thread(target=worker, args=(queue,)) 
     thread.start()