2017-06-01 4 views
2

私は、ネットワーク要求(CoAP)を実行するためのAMQPを介してRPC要求を受け付けるプログラムを作成しています。 RPC要求を処理するとき、aioamqpコールバックはネットワークIOを担当するタスクを生成します。これらのタスクは、バックグラウンドタスクと見なすことができ、AMQPを介したストリーミングネットワーク応答に対して無限に実行されます(この場合、1つのRPC要求がRPC応答とデータストリーミングをトリガーします)。Python asyncio:参照されていないタスクはガベージコレクタによって破棄されますか?

私は、ネットワークタスクが無作為な時間間隔(終了前)で破壊されることに気付き、asyncioは「タスクが破棄されましたが保留中です」という警告を表示します。この問題は、https://bugs.python.org/issue21163に記載されている問題と似ています。

今のところ、GCがタスクオブジェクトを破壊するのを防ぐモジュールレベルのリストにハードリファレンスを格納することで、この問題を回避しています。しかし、もっと良い方法があるかどうか疑問に思っていましたか?理想的には、私はRPCコールバックでタスクを呼び出す必要がありますが、これによりこれ以上のAMQP操作が完了しないことに気付きました。新しいamqpチャネルを作成し、amqpを介してrpc要求を受信して​​も停止します。しかし、私はこのストールを引き起こしているのかどうかは確信していません(コールバックはそれ自体がコルーチンですので、待機してもaioamqpライブラリ全体が停止することはないと思います)。

RPCクライアントとサーバー用の以下のソースを投稿していますが、どちらもaioamqp/aiocoapの例に基づいています。サーバーでは、 on_rpc_request はAMQPのRPCコールバックとsend_coap_obs_requestは「obs_tasks.append(タスク)」ステートメントが除去されたときに破壊されますネットワーキングコルーチンです。

client.py:

""" 
    CoAP RPC client, based on aioamqp implementation of RPC examples from RabbitMQ tutorial 
""" 

import base64 
import json 
import uuid 

import asyncio 
import aioamqp 


class CoAPRpcClient(object): 
    def __init__(self): 
     self.transport = None 
     self.protocol = None 
     self.channel = None 
     self.callback_queue = None 
     self.waiter = asyncio.Event() 

    async def connect(self): 
     """ an `__init__` method can't be a coroutine""" 
     self.transport, self.protocol = await aioamqp.connect() 
     self.channel = await self.protocol.channel() 

     result = await self.channel.queue_declare(queue_name='', exclusive=True) 
     self.callback_queue = result['queue'] 

     await self.channel.basic_consume(
      self.on_response, 
      no_ack=True, 
      queue_name=self.callback_queue, 
     ) 

    async def on_response(self, channel, body, envelope, properties): 
     if self.corr_id == properties.correlation_id: 
      self.response = body 

     self.waiter.set() 

    async def call(self, n): 
     if not self.protocol: 
      await self.connect() 
     self.response = None 
     self.corr_id = str(uuid.uuid4()) 
     await self.channel.basic_publish(
      payload=str(n), 
      exchange_name='', 
      routing_key='coap_request_rpc_queue', 
      properties={ 
       'reply_to': self.callback_queue, 
       'correlation_id': self.corr_id, 
      }, 
     ) 
     await self.waiter.wait() 

     await self.protocol.close() 
     return json.loads(self.response) 


async def rpc_client(): 
    coap_rpc = CoAPRpcClient() 

    request_dict = {} 
    request_dict_json = json.dumps(request_dict) 

    print(" [x] Send RPC coap_request({})".format(request_dict_json)) 
    response_dict = await coap_rpc.call(request_dict_json) 
    print(" [.] Got {}".format(response_dict)) 


asyncio.get_event_loop().run_until_complete(rpc_client()) 

server.py:タスクがスケジュールされている場合

""" 
CoAP RPC server, based on aioamqp implementation of RPC examples from RabbitMQ tutorial 
""" 

import base64 
import json 
import sys 

import logging 
import warnings 

import asyncio 
import aioamqp 
import aiocoap 

amqp_protocol = None 
coap_client_context = None 
obs_tasks = [] 

AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME = 'topic_coap' 
AMQP_COAP_NOTIFICATIONS_TOPIC_NAME = 'topic' 
AMQP_COAP_NOTIFICATIONS_ROUTING_KEY = 'coap.response' 

def create_response_dict(coap_request, coap_response): 
    response_dict = {'request_uri': "", 'code': 0} 
    response_dict['request_uri'] = coap_request.get_request_uri() 
    response_dict['code'] = coap_response.code 

    if len(coap_response.payload) > 0: 
     response_dict['payload'] = base64.b64encode(coap_response.payload).decode('utf-8') 

    return response_dict 


async def handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response): 
    # create response dict: 
    response_dict = create_response_dict(coap_request, coap_response) 
    message = json.dumps(response_dict) 

    # create new channel: 
    global amqp_protocol 
    amqp_channel = await amqp_protocol.channel() 

    await amqp_channel.basic_publish(
     payload=message, 
     exchange_name='', 
     routing_key=amqp_properties.reply_to, 
     properties={ 
      'correlation_id': amqp_properties.correlation_id, 
     }, 
    ) 

    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag) 

    print(" [.] handle_coap_response() published response: {}".format(response_dict)) 


def incoming_observation(coap_request, coap_response): 
    asyncio.async(handle_coap_notification(coap_request, coap_response)) 


async def handle_coap_notification(coap_request, coap_response): 
    # create response dict: 
    response_dict = create_response_dict(coap_request, coap_response) 
    message = json.dumps(response_dict) 

    # create new channel: 
    global amqp_protocol 
    amqp_channel = await amqp_protocol.channel() 

    await amqp_channel.exchange(AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, AMQP_COAP_NOTIFICATIONS_TOPIC_NAME) 

    await amqp_channel.publish(message, exchange_name=AMQP_COAP_NOTIFICATIONS_EXCHANGE_NAME, routing_key=AMQP_COAP_NOTIFICATIONS_ROUTING_KEY) 

    print(" [.] handle_coap_notification() published response: {}".format(response_dict)) 


async def send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request): 
    observation_is_over = asyncio.Future() 
    try: 
     global coap_client_context 
     requester = coap_client_context.request(coap_request) 
     requester.observation.register_errback(observation_is_over.set_result) 
     requester.observation.register_callback(lambda data, coap_request=coap_request: incoming_observation(coap_request, data)) 

     try: 
      print(" [..] Sending CoAP obs request: {}".format(request_dict)) 
      coap_response = await requester.response 
     except socket.gaierror as e: 
      print("Name resolution error:", e, file=sys.stderr) 
      return 
     except OSError as e: 
      print("Error:", e, file=sys.stderr) 
      return 

     if coap_response.code.is_successful(): 
      print(" [..] Received CoAP response: {}".format(coap_response)) 
      await handle_coap_response(amqp_envelope, amqp_properties, coap_request, coap_response) 
     else: 
      print(coap_response.code, file=sys.stderr) 
      if coap_response.payload: 
       print(coap_response.payload.decode('utf-8'), file=sys.stderr) 
      sys.exit(1) 

     exit_reason = await observation_is_over 
     print("Observation is over: %r"%(exit_reason,), file=sys.stderr) 

    finally: 
     if not requester.response.done(): 
      requester.response.cancel() 
     if not requester.observation.cancelled: 
      requester.observation.cancel() 


async def on_rpc_request(amqp_channel, amqp_body, amqp_envelope, amqp_properties): 
    print(" [.] on_rpc_request(): received RPC request: {}".format(amqp_body)) 

    request_dict = {} # hardcoded to vdna.be for SO example 
    aiocoap_code = aiocoap.GET 
    aiocoap_uri = "coap://vdna.be/obs" 
    aiocoap_payload = "" 

    # as we are ready to send the CoAP request, ack the client already indicating we have received the RPC request 
    await amqp_channel.basic_client_ack(delivery_tag=amqp_envelope.delivery_tag) 

    coap_request = aiocoap.Message(code=aiocoap_code, uri=aiocoap_uri, payload=aiocoap_payload) 
    coap_request.opt.observe = 0 

    task = asyncio.ensure_future(send_coap_obs_request(amqp_envelope, amqp_properties, request_dict, coap_request)) 
    # we have to keep a hard ref to this task, otherwise the python garbage collector destroyes the task before it is completed. See https://bugs.python.org/issue21163 
    # this is apparent from the "Task was destroyed but it is pending" exception thrown after random (lengthy) time intervals, probably the time interval is related to when the gc is triggered 
    # await task # this does not seem to work, as it prevents new amqp operations from executing (e.g. amqp channels do not get created) 
    # we are actually not interested in waiting for the task anyway, so instead just keep a hard ref to the task in the obs_tasks list 
    obs_tasks.append(task) # TODO: when do we remove the task from the list? 


async def amqp_connect(): 
    try: 
     (transport, protocol) = await aioamqp.connect('localhost', 5672) 
     print(" [x] Connected to AMQP broker") 
     return (transport, protocol) 
    except aioamqp.AmqpClosedConnection as ex: 
     print("closed connections: {}".format(ex)) 
     raise ex 


async def main(): 
    """Open AMQP connection to broker, subscribe to coap_request_rpc_queue and setup aiocoap client context """ 

    try: 
     global amqp_protocol 
     (amqp_transport, amqp_protocol) = await amqp_connect() 

     channel = await amqp_protocol.channel() 

     await channel.queue_declare(queue_name='coap_request_rpc_queue') 
     await channel.basic_qos(prefetch_count=10, prefetch_size=0, connection_global=False) 
     await channel.basic_consume(on_rpc_request, queue_name='coap_request_rpc_queue') 

     print(" [x] Awaiting CoAP request RPC requests") 
    except aioamqp.AmqpClosedConnection as ex: 
     print("amqp_connect: closed connections: {}".format(ex)) 
     exit() 

    global coap_client_context 
    coap_client_context = await aiocoap.Context.create_client_context() 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 

    loop.set_debug(True) 

    asyncio.async(main()) 
    loop.run_forever() 

答えて

0

、それは_stepコールバックがループに予定されているのです。そのコールバックは、selfによってタスクへの参照を保持します。私はコードをチェックしていませんが、私はループがそのコールバックへの参照を保持しているという確信があります。ただし、待ち時間や未来を待つタスクがある場合は、_stepコールバックはスケジュールされません。その場合、タスクはタスクへの参照を保持するdoneコールバックを追加しますが、ループは未来を待っているタスクへの参照を保持しません。

タスクが待っている未来への参照を保持している限り、すべて正常です。しかし、未来へのハードリファレンスを保持しているものがなければ、未来はガベージコレクションを得ることができ、そのときタスクはガベージコレクションを得ることができます。

私はあなたのタスクが未来が待っているところで呼び出すものを参照していないかもしれません。 一般的に、未来を参照する必要があるので、誰かがその結果を最終的に設定できるので、未参照の未来がある場合はバグかもしれません。

関連する問題