2016-09-28 6 views
1

私はデータベースレコードセット(約1000行)を持っていますが、現在、レコードごとに余分なDBクエリを使用してより多くのデータを統合するために、それらを繰り返しています。レコードセット内のPythonマルチスレッド

これを実行すると、処理時間が100秒になることがあります。

私がしたいことは、この機能を2-4のプロセスと共有することです。

私はPython 2.7を使用してAWSラムダ互換性を持っています。

def handler(event, context): 

    try: 

     records = connection.get_users() 

     mandrill_client = open_mandrill_connection() 

     mandrill_messages = get_mandrill_messages() 

     mandrill_template = 'POINTS weekly-report-to-user' 

     start_time = time.time() 

     messages = build_messages(mandrill_messages, records) 

     print("OVERALL: %s seconds ---" % (time.time() - start_time)) 

     send_mandrill_message(mandrill_client, mandrill_template, messages) 

     connection.close_database_connection() 

     return "Process Completed" 

    except Exception as e: 

     print(e) 

私がスレッドに入れたい機能を以下に示します。

def build_messages(messages, records): 

for record in records: 

    record = dict(record) 

    stream = get_user_stream(record) 

    data = compile_loyalty_stream(stream) 

    messages['to'].append({ 
     'email': record['email'], 
     'type': 'to' 
    }) 

    messages['merge_vars'].append({ 
     'rcpt': record['email'], 
     'vars': [ 
      { 
       'name': 'total_points', 
       'content': record['total_points'] 
      }, 
      { 
       'name': 'total_week', 
       'content': record['week_points'] 
      }, 
      { 
       'name': 'stream_greek', 
       'content': data['el'] 
      }, 
      { 
       'name': 'stream_english', 
       'content': data['en'] 
      } 
     ] 
    }) 

return messages 

私は何をしようとしたことはマルチプロセッシング・ライブラリをインポートします

試し内部のプールを作成し
from multiprocessing.pool import ThreadPool 

をブロックし、このプール内の関数をマップしました。

pool = ThreadPool(4) 

messages = pool.map(build_messages_in, itertools.izip(itertools.repeat(mandrill_messages), records)) 

def build_messages_in(a_b): 
    build_msg(*a_b) 


def build_msg(a, b): 
    return build_messages(a, b) 

def get_user_stream(record): 

    response = [] 

    i = 0 

    for mod, mod_id, act, p, act_created in izip(record['models'], record['model_ids'], record['actions'], 
               record['points'], record['action_creation']): 

     information = get_reference(mod, mod_id) 

     if information: 

      response.append({ 
       'action': act, 
       'points': p, 
       'created': act_created, 
       'info': information 
      }) 

      if (act == 'invite_friend') \ 
        or (act == 'donate') \ 
        or (act == 'bonus_500_general') \ 
        or (act == 'bonus_1000_general') \ 
        or (act == 'bonus_500_cancel') \ 
        or (act == 'bonus_1000_cancel'): 

       response[i]['info']['date_ref'] = act_created 
       response[i]['info']['slug'] = 'attiki' 

      if (act == 'bonus_500_general') \ 
        or (act == 'bonus_1000_general') \ 
        or (act == 'bonus_500_cancel') \ 
        or (act == 'bonus_1000_cancel'): 

       response[i]['info']['title'] = '' 

      i += 1 

    return response 

最後に、ループのをbuild_message関数から削除しました。

「NoneType」オブジェクトは反復可能ではありません。

これを行う正しい方法ですか?

+0

@GhostCat私は質問を提出し、私が試したことを書き留めるのを忘れました。以下は動作するコードですが、以下は動作するはずのコードですが、動作しません。基本的に私はbuild_messages関数をマルチプロセスしようとしています。 – mallix

+0

今すぐ専門家にあなたを助けてもらおう;-) – GhostCat

+0

あなたはそのエラーをどこにしているのかは言わなかった。私が収集できるものから、これは 'mailchimp' APIを使用しています。私は最も長い待ち時間がAPIレスポンスであると仮定していますか? – roganjosh

答えて

2

あなたのコードはかなり深いと思われますので、multithreadingが高レベルで適用されたときにパフォーマンスが向上するとは確信できません。したがって、最大の遅延を与え、特定のボトルネックにどのようにアプローチするかを検討することが重要です。スレッディング制限の詳細については、hereを参照してください。

たとえば、コメントで説明したように、長い時間がかかる単一のタスクを特定できれば、より多くのCPUパワーを活用するためにmultiprocessingを使用して並列化を試みることができます。ここには、あなた自身のコードベースに入らずにPostgresのクエリをミラーリングするのに十分理解できる単純な汎用的な例があります。私はそれが実行不可能な量の努力だと思う。

import multiprocessing as mp 
import time 
import random 
import datetime as dt 

MAILCHIMP_RESPONSE = [x for x in range(1000)] 

def chunks(l, n): 
    n = max(1, n) 
    return [l[i:i + n] for i in range(0, len(l), n)] 


def db_query(): 
    ''' Delayed response from database ''' 
    time.sleep(0.01) 
    return random.random() 


def do_queries(query_list): 
    ''' The function that takes all your query ids and executes them 
    sequentially for each id ''' 
    results = [] 
    for item in query_list: 
     query = db_query() 
     # Your super-quick processing of the Postgres response 
     processing_result = query * 2 
     results.append([item, processing_result]) 
    return results 


def single_processing(): 
    ''' As you do now - equivalent to get_reference ''' 
    result_of_process = do_queries(MAILCHIMP_RESPONSE) 
    return result_of_process 


def multi_process(chunked_data, queue): 
    ''' Same as single_processing, except we put our results in queue rather 
    than returning them ''' 
    result_of_process = do_queries(chunked_data) 
    queue.put(result_of_process) 


def multiprocess_handler(): 
    ''' Divide and conquor on our db requests. We split the mailchimp response 
    into a series of chunks and fire our queries simultaneously. Thus, each 
    concurrent process has a smaller number of queries to make ''' 

    num_processes = 4 # depending on cores/resources 
    size_chunk = len(MAILCHIMP_RESPONSE)/num_processes 
    chunked_queries = chunks(MAILCHIMP_RESPONSE, size_chunk) 

    queue = mp.Queue() # This is going to combine all the results 

    processes = [mp.Process(target=multi_process, 
       args=(chunked_queries[x], queue)) for x in range(num_processes)] 

    for p in processes: p.start() 

    divide_and_conquor_result = [] 
    for p in processes: 
     divide_and_conquor_result.extend(queue.get()) 

    return divide_and_conquor_result 


if __name__ == '__main__': 
    start_single = dt.datetime.now() 

    single_process = single_processing() 

    print "Single process took {}".format(dt.datetime.now() - start_single) 
    print "Number of records processed = {}".format(len(single_process)) 

    start_multi = dt.datetime.now() 

    multi = multiprocess_handler() 

    print "Multi process took {}".format(dt.datetime.now() - start_multi) 
    print "Number of records processed = {}".format(len(multi)) 
関連する問題