2017-02-10 12 views
0

私は300万以上のアイテム行を持つtsvファイルを持っています。そこには、各Itemにid、group、およびurlがあり、groupカラムがソートされます。Pythonで数百万の画像URLを効率的にチェックする

すなわち

x1 gr1 {some url}/x1.jpg 
x2 gr1 {some url}/x2.jpg 
x3 gr2 {some url}/x1.jpg 

私はPythonスクリプトにそれをロードし、データベースにそれらの項目をロードする前に、グループのすべての項目のURLのステータス200 OKかどうかを確認する必要があります。私はプロセスを使用し、それぞれについてURLチェックを行うことを考えました(これはあまり良い経験ではありません)。

私のロジックatm:配列a1をgr1 - > a1の各項目を新しいプロセスに渡す - >プロセスが200をチェック - > a2の配列に入れる - > a1のすべての項目をチェックしてa2をDBにプッシュする(他のものと共に) - > repeat

これは100,000アイテムの場合、30分を要します。ボトルネックはURLチェックです。 URLをチェックせずに、スクリプトは比較して速いです。これまでのところ:

import csv 
import re 
import requests 
import multiprocessing 
from pymongo import MongoClient 
import sys 

#Load in Data 
f = open('../tsvsorttest.tsv', 'rb') 
reader = csv.reader(f, delimiter='\n') 

#Get the first group name 
currGroup = re.split(r'\t', next(reader)[0].decode('utf8'))[1] 
currGroupNum = 0 
items = [] 
checkedItems = [] 

#Method that checks the URL, if its 200, add to newItems 
def check_url(newitem): 
    if requests.get(newitem['image_url']).status_code is 200: 
     print('got an ok!') 
     checkedItems.append(newitem) 
    global num_left 
    num_left -= 1 


def clear_img(checkitems): 
    for thisItem in checkitems: 
     p = multiprocessing.Process(target=check_url(thisItem)) 
     p.start() 

#Start the loop, use i to keep track of the iteration count 
for i, utf8_row in enumerate(reader): 
    unicode_row = utf8_row[0].decode('utf8') 

    x = re.split(r'\t', unicode_row) 

    item = {"id": x[0], 
      "group": x[1], 
      "item_url": x[2] 
      } 
    if currGroup != x[1]: 
     y = len(items) 
     print('items length is ' + str(y)) 

     #Dont want single item groups 
     if y > 1: 
      print 'beginning url checks' 
      num_left = len(items) 


      clear_img(items) 
      while num_left is not 0: 
       print 'Waiting' 

      num_left = 0 
      batch = {"vran": currGroup, 
        "bnum": currGroupNum, 
        "items": newItems, 
        } 
      if len(checkedItems) > 0: 
       batches.insert_one(batch) 
       currGroupNum += 1 

     currGroup = x[1] 
     items = [] 
     checkedItems = [] 

    items.append(item) 

    if i % 100 == 0: 
     print "Milestone: " + str(i) 

print "done" 

その他の考慮事項:などに30個の別々のTSVファイルを元Tsvをを分割して並列にバッチスクリプトを30回実行します。これは違いをもたらすだろうか?

+1

イメージが「通常の」Webサーバーから要求された場合は、GET要求ではなくHEADを実行できます。 –

+0

ああ、それは助けてくれるだろう、私はそれを行かせるだろう。 –

+0

Webサーバーからの応答を取得する非同期の性質は、CPUコア全体にタスクを分散することをより重視するマルチプロセッシングライブラリには適していません。ワーカープールのサイズを大幅に増やして、ここにあるすべてのI/Oバウンドブロッキングを許可したいと思うかもしれません。 – paddy

答えて

2
  1. HEADリクエストを使用して実際の画像を必要としないので、速度を向上させる必要があります。応答が200でも404でもない場合はHEADが許可されない場合があり(405)、GET要求を使用して再度試行します。
  2. 現在、新しいタスクを開始する前に現在のグループが終了するのを待っています。一般に、常に同じ数の実行要求をほぼ同じに保つことが望ましいでしょう。また、作業者のプールを大幅に増やしたいと思うかもしれません。タスクは主にI/O-Boundであるため、3行目(つまり非同期I/O)に沿って何かを実行することをお勧めします。
  3. あなたは、Python 3を使用することに開いている場合は、https://pypi.python.org/pypi/aiohttpを使用して非同期I/O(https://docs.python.org/3/library/asyncio.html)のための優れたサポートを利用することができます:
import asyncio 
from aiohttp import ClientSession, Timeout 
import csv 
import re 
from threading import Thread 
from queue import Queue 
from time import sleep 

async def check(url, session): 
    try: 
     with Timeout(10): 
      async with session.head(url) as response: 
       if response.status == 200: 
        return True 
       elif response.status == 404: 
        return False 
       else: 
        async with session.get(url) as response: 
         return (response.status == 200) 
    except: 
     return False 



def worker(q): 
    while True: 
     f = q.get() 
     try: 
      f() 
     except Exception as e: 
      print(e) 
     q.task_done() 

q = Queue() 
for i in range(4): 
    t = Thread(target=worker,args=(q,)) 
    t.daemon = True 
    t.start() 

def item_ok(url): 
    #Do something 
    sleep(0.5) 
    pass 

def item_failed(url): 
    #Do something 
    sleep(0.5) 
    pass 

def group_done(name,g): 
    print("group %s with %d items done (%d failed)\n" % 
      (name,g['total'],g['fail'])) 

async def bound_check(sem, item, session, groups): 
    async with sem: 
     g = groups[item["group"]] 
     if (await check(item["item_url"], session)): 
      g["success"] += 1 
      q.put(lambda: item_ok(item["item_url"])) 
     else: 
      g["fail"] += 1 
      q.put(lambda: item_failed(item["item_url"])) 
     if g["success"] + g["fail"] == g['total']: 
      q.put(lambda: group_done(item['group'],g)) 
     bound_check.processed += 1 
     if bound_check.processed % 100 == 0: 
      print ("Milestone: %d\n" % bound_check.processed) 

bound_check.processed = 0 

groups = {} 

async def run(max_pending=1000): 
    #Choose such that you do not run out of FDs 
    sem = asyncio.Semaphore(max_pending) 

    f = open('./test.tsv', 'r',encoding = 'utf8') 
    reader = csv.reader(f, delimiter='\n') 

    tasks = [] 

    async with ClientSession() as session: 
     for _, utf8_row in enumerate(reader): 
      unicode_row = utf8_row[0] 
      x = re.split(r'\t', unicode_row) 
      item = {"id": x[0],"group": x[1],"item_url": x[2]} 
      if not item["group"] in groups: 
       groups[item["group"]] = {'total' : 1, 
             'success' : 0, 
             'fail'  : 0, 
             'items' : [item]} 
      else: 
       groups[item["group"]]['total'] += 1 
       groups[item["group"]]['items'].append(item) 
      task = asyncio.ensure_future(bound_check(sem, item, session, groups)) 
      tasks.append(task) 

     responses = asyncio.gather(*tasks) 
     await responses 

loop = asyncio.get_event_loop() 
loop.run_until_complete(run()) 
q.join() 

print("Done") 
2

それは既にあなたが使用しようとするべきであると述べました。 GETの代わりにHEADです。それはイメージをダウンロードする必要はありません。さらに、要求ごとに個別のプロセスを生成しているように見えますが、これも非効率的です。

ここでは、asyncioの使用が実際には必要ないと思います。プレーンスレッドプール(プロセスプールではない)を使用するソリューションは、把握するのが簡単です。さらに、Python 2.7でも利用できます。

import requests 
from concurrent.futures import ThreadPoolExecutor, as_completed 
import csv 
from collections import defaultdict 

def read_rows(file): 
    with open(file) as f_in: 
     return [row for row in csv.reader(f_in, delimiter='\t')] 

def check_url(inp): 
    """Gets called by workers in thread pool. Checks for existence of URL.""" 
    id, grp, url = inp 
    def chk(): 
     try: 
      return requests.head(url).status_code == 200 
     except IOError as e: 
      return False 
    return (id, grp, url, chk()) 

if __name__ == '__main__': 
    d = defaultdict(lambda: []) 
    with ThreadPoolExecutor(max_workers=20) as executor: 
     future_to_input = {executor.submit(check_url, inp): inp for inp in read_rows('urls.txt')} 
     for future in as_completed(future_to_input): 
      id, grp, url, res = future.result() 
      d[grp].append((id, url, res)) 
    # do something with your d (e.g. sort appropriately, filter those with len(d[grp]) <= 1, ...) 
    for g, bs in d.items(): 
     print(g) 
     for id, url, res in bs: 
      print(" %s %5s %s" % (id, res, url)) 

あなたが見ることができるように、私は、個々にCSV入力の各行を処理し、その結果(dを使用)ではなく、入力にグルーピングを行います。ほとんどの味の問題は、私は思います。あなたはmax_workers=20で遊んで、それを増やしたいかもしれません。

関連する問題