2017-02-22 7 views
0

ファイルを読み込んで処理するこのコードはありますか?ファイルはかなり大きく、1,200万行なので、私はこれを手動で1000行のファイルに分割し、1000行ごとに順番に各プロセスを開始します(bashスクリプト)。Twisted/Python - 大きなファイルを行単位で処理する

Twistedを使用してファイルをロードし、1つのファイルから1000個のアイテムを処理する方法はありますか(プログレスバーはいいですか?)手動で分割する必要はありませんか?

scanner.py

import argparse 

from tqdm import tqdm 
from sys import argv 
from pprint import pformat 

from twisted.internet.task import react 
from twisted.web.client import Agent, readBody 
from twisted.web.http_headers import Headers 

import lxml.html 

from geoip import geolite2 
import pycountry 

from tld import get_tld 
import json 
import socket 

poweredby = "" 
server = "" 
ip = "" 


def cbRequest(response, url): 
    global poweredby, server, ip 
    # print 'Response version:', response.version 
    # print 'Response code:', response.code 
    # print 'Response phrase:', response.phrase 
    # print 'Response headers:' 
    # print pformat(list(response.headers.getAllRawHeaders())) 
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0] 
    server = response.headers.getRawHeaders("Server")[0] 

    #print poweredby 
    #print server 

    d = readBody(response) 
    d.addCallback(cbBody, url) 
    return d 


def cbBody(body, ourl): 
    global poweredby, server,ip 

    #print body 
    html_element = lxml.html.fromstring(body) 
    generator = html_element.xpath("//meta[@name='generator']/@content") 

    ip = socket.gethostbyname(ourl) 

    try: 
     match = geolite2.lookup(ip) 
     if match is not None: 
      country = match.country 
      try: 

       c = pycountry.countries.lookup(country) 
       country = c.name 
      except: 
       country = "" 

    except: 
     country = "" 
    try: 
     res = get_tld("http://www" + ourl, as_object=True) 
     tld = res.suffix 
    except: 
     tld = "" 

    try: 
     match = re.search(r'[\w\.-][email protected][\w\.-]+', body) 
     email = match.group(0) 
    except: 
     email = "" 

    permalink=ourl.rstrip().replace(".","-") 

    try: 
     item = generator[0] 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 
    except: 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 


    print val 

if __name__ == '__main__': 
    parser = argparse.ArgumentParser(description='Scanner v0.99') 
    parser.add_argument(
     '-i', '--input', help='Input list of domains', required=True) 
    args = parser.parse_args() 
    input = args.input 

with open(input) as f: 
    urls = f.read().splitlines() 


def mainjob(reactor, urls=urls): 
    for url in tqdm(urls): 
     agent = Agent(reactor) 
     d = agent.request(
      'GET', "http://" + url, 
      Headers({'User-Agent': ['bot']}), 
      None) 
     d.addCallback(cbRequest, url) 
     d.addErrback(lambda x: None) # ignore errors 
    return d 


react(mainjob, argv[3:]) 

アップデート1:

今、私はこのようにそれを実行します。

file.txtは - 12,000,000ライン

chunk01.txt - を持つファイル1000行 。 。 。

チャンクファイルごとにスクリプトを実行します。

python scanner.py chunk01.txt 
python scanner.py chunk02.txt 
. 
. 
. 

一度スクリプトを実行したい:

python scanner.py file.txt 

問題は)私が(反応する引数としてURLを渡す必要があること、あります。私がメモリに(f.read()経由で)12,000,000個のファイルを読み込むと大きすぎます。それで、私はファイルを分割し、各小さなファイルごとにスクリプトを実行しました。

アップデート2 ...それは今明確であるホープ:ジャン=ポール・Calderoneの答え@に基づいて

、私はこのコードを調理しました。

しかし、私は上のことからぶつかっています、動作しているようです:....私は18万ドメイン(入力ファイルから各行を)引き受ける、スクリプトがのみ約を出力/印刷した

18万イテレーション35707行(エントリ)。私はそれが180,000に近い何かになると期待します...私はいくつかのドメインがタイムアウトすることを知っています。私はそれを "古い"方法で実行すると、より一貫性があり、数値が近くなった、すなわち、入力ドメインの数が出力ファイルの出力行に近くなった。

コードに「悪い」ものがありますか?何か案は?

python scanner.py > out.txt 

181668it [1:47:36, 4.82it/s] 

及び行カウント:

wc -l out.txt 
36840 out.txt 

scanner.py

import argparse 

from tqdm import tqdm 
from sys import argv 
from pprint import pformat 

from twisted.internet.task import react 
from twisted.web.client import Agent, readBody 
from twisted.web.http_headers import Headers 
from twisted.internet.task import cooperate 
from twisted.internet.defer import gatherResults 

import lxml.html 

from geoip import geolite2 
import pycountry 

from tld import get_tld 
import json 
import socket 

poweredby = "" 
server = "" 
ip = "" 


def cbRequest(response, url): 
    global poweredby, server, ip 
    # print 'Response version:', response.version 
    # print 'Response code:', response.code 
    # print 'Response phrase:', response.phrase 
    # print 'Response headers:' 
    # print pformat(list(response.headers.getAllRawHeaders())) 
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0] 
    server = response.headers.getRawHeaders("Server")[0] 

    #print poweredby 
    #print server 

    d = readBody(response) 
    d.addCallback(cbBody, url) 
    return d 


def cbBody(body, ourl): 
    global poweredby, server,ip 

    #print body 
    html_element = lxml.html.fromstring(body) 
    generator = html_element.xpath("//meta[@name='generator']/@content") 

    ip = socket.gethostbyname(ourl) 

    try: 
     match = geolite2.lookup(ip) 
     if match is not None: 
      country = match.country 
      try: 

       c = pycountry.countries.lookup(country) 
       country = c.name 
      except: 
       country = "" 

    except: 
     country = "" 
    try: 
     res = get_tld("http://www" + ourl, as_object=True) 
     tld = res.suffix 
    except: 
     tld = "" 

    try: 
     match = re.search(r'[\w\.-][email protected][\w\.-]+', body) 
     email = match.group(0) 
    except: 
     email = "" 

    permalink=ourl.rstrip().replace(".","-") 

    try: 
     item = generator[0] 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 
    except: 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 


    print val 


def main(reactor, url_path): 
    urls = open(url_path) 
    return mainjob(reactor, (url.strip() for url in urls)) 

def mainjob(reactor, urls=argv[2:]): 
    #for url in urls: 
    # print url 
    agent = Agent(reactor) 
    work = (process(agent, url) for url in tqdm(urls)) 
    tasks = list(cooperate(work) for i in range(100)) 
    return gatherResults(list(task.whenDone() for task in tasks)) 



def process(agent, url): 
    d = agent.request(
     'GET', "http://" + url, 
     Headers({'User-Agent': ['bot']}), 
     None) 
    d.addCallback(cbRequest, url) 
    d.addErrback(lambda x: None) # ignore errors 
    return d 

react(main, ["./domains.txt"]) 

アップデート3:

エラーにエラーを印刷するためのコードを更新します。TXT

import argparse 

from tqdm import tqdm 
from sys import argv 
from pprint import pformat 

from twisted.internet.task import react 
from twisted.web.client import Agent, readBody 
from twisted.web.http_headers import Headers 
from twisted.internet.task import cooperate 
from twisted.internet.defer import gatherResults 

import lxml.html 

from geoip import geolite2 
import pycountry 

from tld import get_tld 
import json 
import socket 

poweredby = "" 
server = "" 
ip = "" 

f = open("errors.txt", "w") 


def error(response, url): 
    f.write("Error: "+url+"\n") 


def cbRequest(response, url): 
    global poweredby, server, ip 
    # print 'Response version:', response.version 
    # print 'Response code:', response.code 
    # print 'Response phrase:', response.phrase 
    # print 'Response headers:' 
    # print pformat(list(response.headers.getAllRawHeaders())) 
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0] 
    server = response.headers.getRawHeaders("Server")[0] 

    #print poweredby 
    #print server 

    d = readBody(response) 
    d.addCallback(cbBody, url) 
    return d 


def cbBody(body, ourl): 
    global poweredby, server,ip 

    #print body 
    html_element = lxml.html.fromstring(body) 
    generator = html_element.xpath("//meta[@name='generator']/@content") 

    ip = socket.gethostbyname(ourl) 

    try: 
     match = geolite2.lookup(ip) 
     if match is not None: 
      country = match.country 
      try: 

       c = pycountry.countries.lookup(country) 
       country = c.name 
      except: 
       country = "" 

    except: 
     country = "" 
    try: 
     res = get_tld("http://www" + ourl, as_object=True) 
     tld = res.suffix 
    except: 
     tld = "" 

    try: 
     match = re.search(r'[\w\.-][email protected][\w\.-]+', body) 
     email = match.group(0) 
    except: 
     email = "" 

    permalink=ourl.rstrip().replace(".","-") 

    try: 
     item = generator[0] 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\",\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":" + json.dumps(item) + ",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 
    except: 
     val = "{ \"Domain\":" + json.dumps(
      "http://" + ourl.rstrip()) + ",\"IP\":\"" + ip + "\"," + "\"Server\":" + json.dumps(
      str(server)) + ",\"PoweredBy\":" + json.dumps(
       str(poweredby)) + ",\"MetaGenerator\":\"\",\"Email\":" + json.dumps(
        email) + ",\"Suffix\":\"" + tld + "\",\"CountryHosted\":\"" + country+"\",\"permalink\":\""+permalink+"\" }" 


    print val 


def main(reactor, url_path): 
    urls = open(url_path) 
    return mainjob(reactor, (url.strip() for url in urls)) 

def mainjob(reactor, urls=argv[2:]): 
    #for url in urls: 
    # print url 
    agent = Agent(reactor) 
    work = (process(agent, url) for url in tqdm(urls)) 
    tasks = list(cooperate(work) for i in range(100)) 
    return gatherResults(list(task.whenDone() for task in tasks)) 



def process(agent, url): 
    d = agent.request(
     'GET', "http://" + url, 
     Headers({'User-Agent': ['crawler']}), 
     None) 
    d.addCallback(cbRequest, url) 
    d.addErrback(error, url) 
    return d 

react(main, ["./domains.txt"]) 

f.close() 

更新4:

[email protected]:~/crawler$ python scanner.py 
2it [00:00, 840.71it/s] 
[email protected]:~/crawler$ cat errors.txt 
Error: google.al 
Error: fau.edu.al 

あなたは、彼らがエラーを持っていた見ることができるように、しかし:

は、私はちょうど2つのドメインと、Wiresharkのでトラフィックをキャプチャし、それらのドメインは、以前にエラーが発生しましたWiresharkと私は応答を見る:

enter image description here

+1

"for line in f"のようにファイルを処理する必要があります。 f.read()はファイル全体をメモリに読み込みます。ここをクリックしてください:http://stackoverflow.com/a/8010133/1904113 – MKesper

+0

ありがとう!私はこれを認識していますが、うまくいけば、それをねじれた(反応する)ためのパラメータ(URL)として渡す必要があります。行ごとに読み込むことは、プログラムをこの時点まで進めるのではなく、ファイルを読み込む際に停止します。反応してPythonに入りました。 –

+0

新しいコードで何をしたいか分かりません。あなたは "1つのファイルから1000個のアイテムを処理"することで私を失った。あなたはこの目標を再現しようとすることができますか? –

答えて

1

プログラムが作成する並行処理の量に制限を加える必要があります。現在、あなたは同時に与えられたすべてのURLを処理する - あるいは少なくとも、してみてください:

def mainjob(reactor, urls=urls): 
    for url in tqdm(urls): 
     agent = Agent(reactor) 
     d = agent.request(
      'GET', "http://" + url, 
      Headers({'User-Agent': ['bot']}), 
      None) 
     d.addCallback(cbRequest, url) 
     d.addErrback(lambda x: None) # ignore errors 
    return d 

これはそれらのいずれかが完了するのを待たずに、各URLの要求を発行します。代わりに、一度に限定数を実行するにはtwisted.internet.task.cooperateを使用してください。これにより、一度に1つのリクエストが実行されます。

def mainjob(reactor, urls): 
    agent = Agent(reactor) 
    work = (process(agent, url) for url in tqdm(urls)) 
    task = cooperate(work) 
    return task.whenDone() 

def process(agent, url): 
    d = agent.request(
     'GET', "http://" + url, 
     Headers({'User-Agent': ['bot']}), 
     None) 
    d.addCallback(cbRequest, url) 
    d.addErrback(lambda x: None) # ignore errors 
    return d 

おそらくそれ以上のものが必要です。したがって、もう一度cooperate()を呼び出してください:

def mainjob(reactor, urls=urls): 
    agent = Agent(reactor) 
    work = (process(agent, url) for url in tqdm(urls)) 
    tasks = list(cooperate(work) for i in range(100)) 
    return gatherResults(list(task.whenDone() for task in tasks)) 

これは一度に100リクエストまで実行します。各タスクは次の要素をworkから引き出し、それを待ちます。 gatherResultsは、100のタスクがすべて終了するのを待ちます。

今だけ時にメモリに完全な入力を読み込む避ける:

def main(reactor, url_path): 
    urls = open(url_path) 
    return mainjob(reactor, (url.strip() for url in urls)) 

react(main, ["path-to-urls.txt"]) 

これは、URLファイルを開きますが、それらが必要だとしてだけそれから行を読み取ります。

+0

THAAAAAAAANKS !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!完璧な答え! –

+0

えええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええええと>ファイル内のドメインへの番号、タイムアウトなど)。私はUpdate 2でそれを説明しました:@ Jean-Paul Calderoneあなたはどんな考えがありますか? –

+0

良いステップは、失敗を数えることができるようにすることです。コードはすぐにそれらを放棄します。失敗数をカウントすると、成功+失敗の合計が正しいかどうかを確認し、失敗を見て改善すべき点があるかどうかを調べることができます。まだ合計が小さい場合は、ドライバを見て、どこにどのようにジョブが落ちているかを見ることができます。 –

関連する問題