マルチプロセッシングプールを持つプロセスプールを作成します。私は対処すべき多くのタスクを持っていますが、タスクのqpsを取得することは容易ではありません。だから私は適切なプールのサイズを設定できるように、プールのアクティブなプロセス番号を取得したい。これは全体のコードです:あなたはそれを行うのかどうかを確認するに.ready()
を使用することができます https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResultPythonはマルチプロセッシングプールのアクティブなプロセス番号を取得します
:
import time
from multiprocessing import Pool
def do_work(msg):
# do some work
if __name__ == '__main__':
consumer = KafkaConsumer(
group_id=worker_config.kafka_group_id,
bootstrap_servers=kafka_url,
auto_offset_reset=worker_config.kafka_reset,
enable_auto_commit=True)
consumer.subscribe(topics=worker_config.kafka_topics)
for message in consumer:
logging.info('topic=%s, partition=%d, msg=%s' % (message.topic, message.partition, msg))
pool.apply_async(do_work, (message,))
process_count = number_of_active_process_of_pool
logging.info("number_of_active_process_number is %d", process_count)
pool.close()
pool.join()
答えに感謝します。コード全体を以下に示します。私はカフカからのメッセージを受け取ります。プール・サイズは8です。メッセージ数が8を超えると、プロセス数は依然として8であり、メッセージはプロセス・プールに保管されます。しかし、メッセージの数が6などの8より少ない場合、プール内のアクティブなプロセスの数は6です。プール内のアクティブなプロセスの数を知りたいのは、6です。私がkafkaからメッセージを受け取ったとき私はそれをプールに投げるので、私はそのプロセスの結果には気を付けません。 – buaawht
私はすべてのメッセージの数を知らないので、 '.ready()'を使用して ''完了したタスク量を取得してください。 – buaawht
apply_asyncを使用してタスクを追加するたびに、返されたタスクオブジェクトがリストに追加されます。あなたが残した仕事量が必要なときはいつでも、あなたはリストを通って準備ができているすべての結果を取り出す。すると、リストを作成して残りのタスクを実行する必要があります。 – Berserker