2017-06-17 17 views
0

async/await構文をTwisted Deferred.addCallbackメソッドで使用したいと思います。しかし、ドキュメントに記載されているように、addCallbackコールバックは同期的に呼び出されます。私はinlineCallbacksデコレータがこの目的のために使われているのを見てきましたが、async/awaitの構文を使用することをお勧めします(それが可能であれば意味があります)。Twistedコールバックでasync/await構文を使用する

は私がpika documentationから元のコードを拾ったが、私はそれを移行しようと運がなかったと非同期/のawait構文:

import pika 
from pika import exceptions 
from pika.adapters import twisted_connection 
from twisted.internet import defer, reactor, protocol, task 


async def run_async(connection): 
    channel = await connection.channel() 
    exchange = await channel.exchange_declare(exchange='topic_link',type='topic') 
    queue = await channel.queue_declare(queue='hello', auto_delete=False, exclusive=False) 
    await channel.queue_bind(exchange='topic_link', queue='hello', routing_key='hello.world') 
    await channel.basic_qos(prefetch_count=1) 
    queue_object, consumer_tag = await channel.basic_consume(queue='hello', no_ack=False) 
    l = task.LoopingCall(read_async, queue_object) 
    l.start(0.01) 


async def read_async(queue_object): 
    ch,method,properties,body = await queue_object.get() 
    if body: 
     print(body) 
    await ch.basic_ack(delivery_tag=method.delivery_tag) 


parameters = pika.ConnectionParameters() 
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters) 
d = cc.connectTCP('rabbitmq', 5672) 
d.addCallback(lambda protocol: protocol.ready) 
d.addCallback(run_async) 
reactor.run() 

誰もrun_async機能をお待ちしていないので、これは明らかに動作しません。

+1

['ensureDeferred'](https://twistedmatrix.com/documents/current/core/howto/defer-intro.html#coroutines-with-async-await)を使用します。 –

+0

@ notorious.no私は 'cc.connectTCP( 'rabbitmq'、5672)'を 'defer.ensureDeferred'でラップしようとしましたが、それは役に立たなかった。それはあなたの意味ですか?ありがとう。 – user1527491

+0

@ notorious.noはそれを得ました。私はensureDeferredでコールバック自体をラップする必要があります。私はobvisouslyそれをしないコールバックの結果をラップしていた。ありがとう。 – user1527491

答えて

2

notorious.noとTwistedのドキュメントで指摘されているように、ensureDeferredは方法です。それでも、コールバックの結果をラップする必要があり、コールバック自体はわかりませんでした。

これは、それが最終的にどのように見えるかです:

def ensure_deferred(f): 
    @functools.wraps(f) 
    def wrapper(*args, **kwargs): 
     result = f(*args, **kwargs) 
     return defer.ensureDeferred(result) 
    return wrapper 


@ensure_deferred 
async def run(connection): 
    channel = await connection.channel() 
    exchange = await channel.exchange_declare(exchange='topic_link', type='topic') 
    queue = await channel.queue_declare(queue='hello', auto_delete=False, exclusive=False) 
    await channel.queue_bind(exchange='topic_link', queue='hello', routing_key='hello.world') 
    await channel.basic_qos(prefetch_count=1) 
    queue_object, consumer_tag = await channel.basic_consume(queue='hello', no_ack=False) 
    l = task.LoopingCall(read, queue_object) 
    l.start(0.01) 


@ensure_deferred 
async def read(queue_object): 
    ch, method, properties, body = await queue_object.get() 
    if body: 
     print(body) 
    await ch.basic_ack(delivery_tag=method.delivery_tag) 


parameters = pika.ConnectionParameters() 
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters) 
d = cc.connectTCP('rabbitmq', 5672) 
d.addCallback(lambda protocol: protocol.ready) 
d.addCallback(run) 
reactor.run() 

感謝。

関連する問題