特定のIOバウンドタスクを実行するスクリプトの作成に取り組んでいます。大量のデータセットをダウンロードし、データそのものを破棄する前にサイズに関する情報を記録する必要があります。並行処理を理解する
問題は、このデータを取得しているソースにcontent-length
ヘッダーがないため、ファイルサイズがどれくらい大きいかを事前に知ることができないことです。これには、ダウンロードプロセスの所要時間を監視し、そのプロセスを強制終了し、長時間(例えば60秒以上)かかる場合に他のプロセスに継続する方法があることが必要です。これは、非常に大きなデータセットに「スタック」するのを避けるために必要です。
requests
はこの機能を内蔵しておらず、ソリューションの検索に多くの時間を費やした後、pebble
ライブラリを使用して同時処理をタイムアウトで実行することに決めました。私の理解は、標準のlib multiprocessing
モジュールの小さな拡張機能で、いくつかの安全機能、つまりエラー処理とタイムアウト(私が望むもの)を追加したものです。 Process pool例に基づいて
try:
with ProcessPool(max_workers=4) as pool:
iterator = pool.map(get_data, process_tuples[3:6], timeout=10)
while True:
try:
rows, cols, filesize, i = next(iterator)
datasets[i]['rows'] = rows
datasets[i]['columns'] = cols
datasets[i]['filesize'] = filesize
except TimeoutError as error:
print("Function took longer than %d seconds. Skipping responsible endpoint..." % error.args[1])
except StopIteration:
break
finally:
with open("../../../data/" + FILE_SLUG + "/glossaries/geospatial.json", "w") as fp:
json.dump(datasets, fp, indent=4)
しかし、これは、2つの方法で期待される動作から分岐:
- 私は
timeout=10
は、個々の時間の量を制限することを考えていましたダウンロードプロセス(完了:get_data
)がかかります。しかし、これを大きなファイルで実行すると、TimeoutError
というメッセージが表示され、処理に30秒以上かかることがあります。 30は私の入力の3倍です。それは私が欲しいものではありません。そこには何が起こっているのですか? TimeoutError
が呼び出されたとき、その実行を破棄して次のブロックに移動する代わりに、プロセスはfinally
ブロックにジャンプします。私はこれが私の最初の質問に対する答えの結果だと思う。
セット 'chunksize'を使った例だとあなたが希望'タイムアウトを取得します可能性がどちらか'の10秒。ライブラリはあなたの質問の第2のポイントをまだサポートしていません。そのようなユースケースをサポートするために、私はすぐにそのロジックを再構築します。 – noxdafox