7

私はpython libtorrentを使用して1日あたり約10k +トレントのメタデータを取得しようとしています。100k +トレントのメタデータを取得するためにPython libtorrentを使用してデーモンを作成

これは、コード

  1. スタートlibtorrentセッションの電流の流れです。
  2. 最近1日以内にアップロードするメタデータが必要なトレントの合計数を取得します。
  3. チャンクでDBから急流ハッシュを取得する
  4. これらのハッシュを使用してマグネットリンクを作成し、マグネットURIごとにハンドルを作成することで、マグネットURIをセッションに追加します。
  5. メタデータがフェッチされている間に1秒間スリープして、メタデータが見つかったかどうかを確認し続けます。
  6. メタデータが受信された場合はDBに追加します。約10分間メタデータを検索したかどうかを確認し、そうであればハンドルを削除します。
  7. 上記は無期限です。セッションの状態を将来保存することができます。

これまでこれを試しました。

#!/usr/bin/env python 
# this file will run as client or daemon and fetch torrent meta data i.e. torrent files from magnet uri 

import libtorrent as lt # libtorrent library 
import tempfile # for settings parameters while fetching metadata as temp dir 
import sys #getting arguiments from shell or exit script 
from time import sleep #sleep 
import shutil # removing directory tree from temp directory 
import os.path # for getting pwd and other things 
from pprint import pprint # for debugging, showing object data 
import MySQLdb # DB connectivity 
import os 
from datetime import date, timedelta 

session = lt.session(lt.fingerprint("UT", 3, 4, 5, 0), flags=0) 
session.listen_on(6881, 6891) 
session.add_extension('ut_metadata') 
session.add_extension('ut_pex') 
session.add_extension('smart_ban') 
session.add_extension('metadata_transfer') 

session_save_filename = "/magnet2torrent/magnet_to_torrent_daemon.save_state" 

if(os.path.isfile(session_save_filename)): 

    fileread = open(session_save_filename, 'rb') 
    session.load_state(lt.bdecode(fileread.read())) 
    fileread.close() 
    print('session loaded from file') 
else: 
    print('new session started') 

session.add_dht_router("router.utorrent.com", 6881) 
session.add_dht_router("router.bittorrent.com", 6881) 
session.add_dht_router("dht.transmissionbt.com", 6881) 
session.add_dht_router("dht.aelitis.com", 6881) 

session.start_dht() 
session.start_lsd() 
session.start_upnp() 
session.start_natpmp() 

alive = True 
while alive: 

    db_conn = MySQLdb.connect( host = '', user = '', passwd = '', db = '', unix_socket='/mysql/mysql.sock') # Open database connection 
    #print('reconnecting') 
    #get all records where enabled = 0 and uploaded within yesterday 
    subset_count = 100 ; 

    yesterday = date.today() - timedelta(1) 
    yesterday = yesterday.strftime('%Y-%m-%d %H:%M:%S') 
    #print(yesterday) 

    total_count_query = ("SELECT COUNT(*) as total_count FROM content WHERE upload_date > '"+ yesterday +"' AND enabled = '0' ") 
    #print(total_count_query) 
    try: 
     total_count_cursor = db_conn.cursor()# prepare a cursor object using cursor() method 
     total_count_cursor.execute(total_count_query) # Execute the SQL command 
     total_count_results = total_count_cursor.fetchone() # Fetch all the rows in a list of lists. 
     total_count = total_count_results[0] 
     print(total_count) 
    except: 
      print "Error: unable to select data" 

    total_pages = total_count/subset_count 
    #print(total_pages) 

    current_page = 1 
    while(current_page <= total_pages): 
     from_count = (current_page * subset_count) - subset_count 

     #print(current_page) 
     #print(from_count) 

     hashes = [] 

     get_mysql_data_query = ("SELECT hash FROM content WHERE upload_date > '" + yesterday +"' AND enabled = '0' ORDER BY record_num DESC LIMIT "+ str(from_count) +" , " + str(subset_count) +" ") 
     #print(get_mysql_data_query) 
     try: 
      get_mysql_data_cursor = db_conn.cursor()# prepare a cursor object using cursor() method 
      get_mysql_data_cursor.execute(get_mysql_data_query) # Execute the SQL command 
      get_mysql_data_results = get_mysql_data_cursor.fetchall() # Fetch all the rows in a list of lists. 
      for row in get_mysql_data_results: 
       hashes.append(row[0].upper()) 
     except: 
      print "Error: unable to select data" 

     #print(hashes) 

     handles = [] 

     for hash in hashes: 
      tempdir = tempfile.mkdtemp() 
      add_magnet_uri_params = { 
       'save_path': tempdir, 
       'duplicate_is_error': True, 
       'storage_mode': lt.storage_mode_t(2), 
       'paused': False, 
       'auto_managed': True, 
       'duplicate_is_error': True 
      } 
      magnet_uri = "magnet:?xt=urn:btih:" + hash.upper() + "&tr=udp%3A%2F%2Ftracker.openbittorrent.com%3A80&tr=udp%3A%2F%2Ftracker.publicbt.com%3A80&tr=udp%3A%2F%2Ftracker.ccc.de%3A80" 
      #print(magnet_uri) 
      handle = lt.add_magnet_uri(session, magnet_uri, add_magnet_uri_params) 
      handles.append(handle) #push handle in handles list 

     #print("handles length is :") 
     #print(len(handles)) 

     while(len(handles) != 0): 
      for h in handles: 
       #print("inside handles for each loop") 
       if h.has_metadata(): 
        torinfo = h.get_torrent_info() 
        final_info_hash = str(torinfo.info_hash()) 
        final_info_hash = final_info_hash.upper() 
        torfile = lt.create_torrent(torinfo) 
        torcontent = lt.bencode(torfile.generate()) 
        tfile_size = len(torcontent) 
        try: 
         insert_cursor = db_conn.cursor()# prepare a cursor object using cursor() method 
         insert_cursor.execute("""INSERT INTO dht_tfiles (hash, tdata) VALUES (%s, %s)""", [final_info_hash , torcontent]) 
         db_conn.commit() 
         #print "data inserted in DB" 
        except MySQLdb.Error, e: 
         try: 
          print "MySQL Error [%d]: %s" % (e.args[0], e.args[1]) 
         except IndexError: 
          print "MySQL Error: %s" % str(e)  


        shutil.rmtree(h.save_path()) # remove temp data directory 
        session.remove_torrent(h) # remove torrnt handle from session 
        handles.remove(h) #remove handle from list 

       else: 
        if(h.status().active_time > 600): # check if handle is more than 10 minutes old i.e. 600 seconds 
         #print('remove_torrent') 
         shutil.rmtree(h.save_path()) # remove temp data directory 
         session.remove_torrent(h) # remove torrnt handle from session 
         handles.remove(h) #remove handle from list 
       sleep(1)   
       #print('sleep1') 

     #print('sleep10') 
     #sleep(10) 
     current_page = current_page + 1 

     #save session state 
     filewrite = open(session_save_filename, "wb") 
     filewrite.write(lt.bencode(session.save_state())) 
     filewrite.close() 


    print('sleep60') 
    sleep(60) 

    #save session state 
    filewrite = open(session_save_filename, "wb") 
    filewrite.write(lt.bencode(session.save_state())) 
    filewrite.close() 

私は一晩実行中のスクリプト以上に保つと急流のメタデータは、一晩のセッションで発見されただけで1200の周りに見つけてみました。 私はスクリプトのパフォーマンスを向上させるために探しています。

私もsave_stateファイルをデコードしようと、700 + DHT nodesは私が接続していますがあります気づきました。したがってDHTのようなものは実行されていません。

メタデータがフェッチされないうちに、無期限にセッション中にkeep the handles activeが実行されます。私が現在やっているように、10分後にハンドルを取り除かないでください。

lib-torrentのpythonバインディングに関する質問はほとんどありません。

  1. 実行できるハンドルはいくつですか?実行中のハンドルに制限はありますか?
  2. 10k +または100kハンドルでシステムの処理速度が低下しますか?またはリソースを食べる?はいの場合はどのリソースですか?私はRAM、ネットワークを意味しますか?
  3. 私はファイアウォールの背後にあり、ブロックされた受信ポートがメタデータフェッチの遅い速度を引き起こす可能性がありますか?
  4. router.bittorrent.comのようなDHTサーバーや、あまりにも多くのリクエストを送信するための他のBAN私のIPアドレスを使用できますか?
  5. 他の同輩がBAN私のIPアドレスを見つけたら、私はあまりにも多くのリクエストを作成しているだけで、メタデータを取得できないでしょうか?
  6. このスクリプトの複数のインスタンスを実行できますか?またはマルチスレッドである可能性がありますか?より良いパフォーマンスを提供しますか?
  7. 同じスクリプトの複数のインスタンスを使用している場合、使用しているIPとポートに応じて各スクリプトに一意のノードIDが割り当てられますが、これは実行可能なソリューションですか?

これ以上の方法はありますか?私が何をしようとしているのか?

答えて

3

私はlibtorrentのAPIに固有の質問に答えることはできませんが、あなたの質問のいくつかは一般的にビットトレントに適用されます。

私のシステムの処理速度が10k +または100kのハンドルが動作しますか?またはリソースを食べる?はいの場合はどのリソースですか?私はRAM、ネットワークを意味しますか?

メタデータダウンロードはフルトレントダウンロードではないため、実際のファイルなどを割り当てることができないため、多くのリソースを使用すべきではありません。しかし、メタデータ自体の最初のチャンクを取得すると、ラム/ディスクスペースが必要になります。

私はファイアウォールの背後にあり、ブロックされた受信ポートは、メタデータフェッチの速度が遅くなる可能性がありますか?

はい、接続を確立できるピアの数を減らすことで、ピア数の少ないスウォームでメタデータをフェッチする(またはすべての接続を確立する)ことがより困難になります。

NATは同じ問題を引き起こす可能性があります。

多くのリクエストを送信するために、router.bittorrent.comや他のBANの私のIPアドレスのようなDHTサーバーを使用できますか?

router.bittorrent.comは、サーバー自体ではなく、ブートストラップノードです。ルックアップは単一ノードを照会せず、数多くの異なる(何百万もの)クエリを照会します。しかし、はい、個々のノードはあなたを禁止する可能性があります。

これは、ランダムに分散されたIDを探して、DHTキースペース全体に負荷を分散することによって緩和できます。

私はこのスクリプトの複数のインスタンスを実行できますか?またはマルチスレッドである可能性がありますか?より良いパフォーマンスを提供しますか?

AIUI libtorrentは、十分なノンブロッキングまたはマルチスレッドなので、一度に多くのトレントをスケジュールできます。

libtorrentに発信DHT要求のレート制限があるかどうかわかりません。

同じスクリプトの複数のインスタンスを使用する場合、各スクリプトは使用しているIPとポートに応じて一意のノードIDを取得しますが、これは実行可能なソリューションですか?

DHTノードIDを意味する場合は、ポート番号ではなく、IPアドレス(BEP 42)から派生しています。いくつかのランダム要素が含まれていますが、IPごとに限られた量のIDを取得できます。

そして、このうちのいくつかは、あなたのシナリオに適用される場合があります:http://blog.libtorrent.org/2012/01/seeding-a-million-torrents/

そして、別のオプションは、バルク・フェッチ急流にCLIが含まmy own DHT implementationです。

+0

本当にありがとうございます、ありがとうございます。 – AMB

関連する問題