2017-10-01 5 views
0

マルチプロセッシングプールを持つプロセスプールを作成します。私は対処すべき多くのタスクを持っていますが、タスクのqpsを取得することは容易ではありません。だから私は適切なプールのサイズを設定できるように、プールのアクティブなプロセス番号を取得したい。これは全体のコードです:あなたはそれを行うのかどうかを確認するに.ready()を使用することができます https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResultPythonはマルチプロセッシングプールのアクティブなプロセス番号を取得します

import time 
from multiprocessing import Pool 

def do_work(msg): 
    # do some work 


if __name__ == '__main__': 
    consumer = KafkaConsumer(
    group_id=worker_config.kafka_group_id, 
    bootstrap_servers=kafka_url, 
    auto_offset_reset=worker_config.kafka_reset, 
    enable_auto_commit=True) 
    consumer.subscribe(topics=worker_config.kafka_topics) 

    for message in consumer: 
     logging.info('topic=%s, partition=%d, msg=%s' % (message.topic, message.partition, msg)) 
     pool.apply_async(do_work, (message,)) 
     process_count = number_of_active_process_of_pool 
     logging.info("number_of_active_process_number is %d", process_count) 


    pool.close() 
    pool.join() 

答えて

0

apply_asyncはあなたにAsyncResultを与えます。このようにして、実行されたタスクの量を取得し、延長されたタスクの量を取得します。 この数がpoolsizeを超えていれば、多くのプロセスがプールされていると仮定できます。プールされていない場合は、実行中のプロセスの量が残りの量になります。

代替:

あなたはapply_async使用せず、代わりにキューされない場合は、そのようなこの1のように: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue

あなたはその後、.qsize()

でおおよそのキューサイズを取得することができますもありますmultiprocessing.active_childrenしかし、これらのプロセスが終了した場合にのみ動作しますが、プールは動作しません。あなたがそれを注文しない限り.join() あなたのケースではそれはうまくいくでしょう。

+0

答えに感謝します。コード全体を以下に示します。私はカフカからのメッセージを受け取ります。プール・サイズは8です。メッセージ数が8を超えると、プロセス数は依然として8であり、メッセージはプロセス・プールに保管されます。しかし、メッセージの数が6などの8より少ない場合、プール内のアクティブなプロセスの数は6です。プール内のアクティブなプロセスの数を知りたいのは、6です。私がkafkaからメッセージを受け取ったとき私はそれをプールに投げるので、私はそのプロセスの結果には気を付けません。 – buaawht

+0

私はすべてのメッセージの数を知らないので、 '.ready()'を使用して ''完了したタスク量を取得してください。 – buaawht

+0

apply_asyncを使用してタスクを追加するたびに、返されたタスクオブジェクトがリストに追加されます。あなたが残した仕事量が必要なときはいつでも、あなたはリストを通って準備ができているすべての結果を取り出す。すると、リストを作成して残りのタスクを実行する必要があります。 – Berserker

関連する問題