2017-05-19 19 views
0

私は非同期クローラと同期クローラを作成しようとしていましたが、今は1つの問題に直面していますが、クロール結果は異なりますが、深さが1の場合のみ)。非同期クローラが正常に動作しない[python]

from bs4 import BeautifulSoup 
import networkx as nx 
import urllib 
import urllib.request 
from urllib.parse import urlparse 
from urllib.parse import urljoin 
import time 
import asyncio 
import aiohttp 
from contextlib import closing 


class Crawler: 

def __init__(self, delay, depth): 
    self.delay = delay 
    self.graph = nx.DiGraph() 
    self.list_of_links = list() 
    self.crawled_urls = list() 
    self.depth = depth 

def validate_url(self, url): 
    """Check if url is valid""" 
    return 'http' in urlparse(url).scheme 

def run(self, async, start_list): 
    if async: 
     t1 = time.time() 
     self.async_crawl(start_list, self.depth) 
     t2 = time.time() 
     print('Async seconds passed: ', t2 - t1) 

    else: 
     t1 = time.time() 
     for elem in start_list: 
      self.crawl(elem, self.depth) 
     t2 = time.time() 
     print('Sync seconds passed: ', t2 - t1) 
    print('Links crawled: ', len(self.crawled_urls)) 
    print('Edges stored: ', len(self.list_of_links)) 
    print('Depth: ', self.depth) 

def crawl(self, url, depth): 
    if url in self.crawled_urls: 
     return [] 
    if depth and self.validate_url(url): 
     self.crawled_urls.append(url) 
     links = self.get_links(url) 
     for link in links: 
      self.list_of_links.append((url, link)) 
      self.crawl(link, depth - 1) 
    else: 
     return [] 

async def fetch_page(self, session, url): 
    """Get one page.""" 
    if url in self.crawled_urls: 
     return [] 
    else: 
     self.crawled_urls.append(url) 
    try: 
     with aiohttp.Timeout(10): 
      async with session.get(url) as response: 
       assert response.status == 200 
       new_urls = self.parse_for_links(url, await response.text()) 
       for new_url in new_urls: 
        self.list_of_links.append((url, new_url)) 
       return new_urls 
    except: 
     return [] 

def async_crawl(self, urls, depth): 
    """Get multiple pages.""" 
    if depth: 
     with closing(asyncio.get_event_loop()) as loop: 
      with aiohttp.ClientSession(loop=loop) as session: 
       tasks = [self.fetch_page(session, url) for url in urls if self.validate_url(url)] 
       new_urls = loop.run_until_complete(asyncio.gather(*tasks)) 
       if new_urls: 
        self.async_crawl(new_urls[0], depth - 1) 


def parse_for_links(self, url, text): 
    soup = BeautifulSoup(text, "html.parser") 
    return [urljoin(url, tag['href']) for tag in soup.findAll('a', href=True)] 

def get_links(self, url): 
    try: 
     req = urllib.request.urlopen(url) 
     req = map(lambda x: x.decode('utf-8'), req) 
     return self.parse_for_links(url, ''.join(list(req))) 
    except: 
     return [] 

def reset(self): 
    self.list_of_links = list() 
    self.crawled_urls = list() 
    self.graph = nx.DiGraph() 

def visualize(self): 
    self.graph.add_edges_from(self.list_of_links) 
    nx.write_gexf(self.graph, "graph.gexf") 

test2 = ['http://www.aclweb.org/anthology/'] 
cr = Crawler(10, 2) 
cr.run(True, test2) 
cr.reset() 
cr.run(False, test2) 

例えばとして、私はあなたに私のテストケースのいずれかが表示されます:非同期作業用

Async seconds passed: 13.632593870162964 
Links crawled: 371 
Edges stored: 15374 
Depth: 2 
Sync seconds passed: 385.6858592033386 
Links crawled: 371 
Edges stored: 102755 
Depth: 2 
+0

'async_crawl'の再帰的な定義で' self.async_crawl() 'が終了するのを待つことは決してありません。また、再帰呼び出しでは何千ものイベントループが生成されますが、これはおそらくあなたがやりたいことではありません。並行処理を制限するために、キューと何らかの種類のmutexを使用して、この関数を実装してください。 – Blender

+0

@Blenderキューについての記事はどこにありますか? –

答えて

1

たぶん、最良の方法を使用し生産者と消費者を。

import asyncio 
    import aiohttp 

    from redd import redis.db.data #just module for take data 

    query = asyncio.Queue() 
    locker = [] 

    async def producer(num): 
     baseurl = redis.db.data 
     while True: 
      try: 
       url = next(baseurl) 
      except StopIteration: 
       print('Producer {} end'.format(num)) 
       break 
      else: 
       await query.put(url) 

    async def consumer(num): 
     flag = True 

     while flag: 
      url = await query.get() 
      async with aiohttp.ClientSession(loop=loop) as session: 
       async with session.get(url) as response: 
         result = await response.read() 
         print(result) 
      if query.empty() and locker[num] is not True: 
       locker[num] = True 
       print('Thread number {} is END: {}'.format(num, locker[num])) 
      if False not in locker: 
       for task in asyncio.Task.all_tasks(): 
        task.cancel() 
       loop.stop() 

    loop = asyncio.get_event_loop() 

    for i in range(2): 
     loop.create_task(producer(i)) 

    for i in range(5): 
     locker.append(False) 
     loop.create_task(consumer(i)) 

    loop.run_forever() 
関連する問題