pikaとスレッドモジュールを使用するPython 3.6スクリプトを開発しようとしています。Python - 別のスレッドで実行される関数間で関数(コールバック)変数を渡す
私はA)はPythonとPythonにはまったく新しく、B)別々のスレッドで実行され、すでに渡されているときに関数間で変数を渡す方法を理解していないという問題があります。受信関数名の最後のカッコ内のパラメータ。
私はこれを考えている理由は、スレッディングを使用しないと、受信関数名を呼び出すだけで関数間で変数を渡すことができるため、渡す変数を括弧で括って、基本的な例を示します以下:
def send_variable():
body = "this is a text string"
receive_variable(body)
def receive_variable(body):
print(body)
この実行、版画:
this is a text string
私はスレッドでの作業を取得するために必要なコードの作業バージョンを以下に示している - これはストレートの機能を使用しています(ノースレッド)とI pikaを使って(RabbitMQ)qからメッセージを受信していますしかし私は、スクリプト内で実行するために、これを翻訳したい。これは、正常に動作します
import pika
...mq connection variables set here...
# defines username and password credentials as variables set at the top of this script
credentials = pika.PlainCredentials(mq_user_name, mq_pass_word)
# defines mq server host, port and user credentials and creates a connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))
# creates a channel connection instance using the above settings
channel = connection.channel()
# defines the queue name to be used with the above channel connection instance
channel.queue_declare(queue=mq_queue)
def callback(ch, method, properties, body):
# passes (body) to processing function
body_processing(body)
# sets channel consume type, also sets queue name/message acknowledge settings based on variables set at top of script
channel.basic_consume(callback, queue=mq_queue, no_ack=mq_no_ack)
# tells the callback function to start consuming
channel.start_consuming()
# calls the callback function to start receiving messages from mq server
callback()
# above deals with pika connection and the main callback function
def body_processing(body):
...code to send a pika message every time a 'body' message is received...
:ナキウサギコールバック関数を介したueueは、私は、「処理機能」に「コールバック」関数で受信したメッセージのボディを渡しますスレッドを使用します。私はこれを行うときに、自分自身のスレッドで実行される関数名にパラメータ 'channel'を指定する必要があります。次に 'processing_function'が以下のようになるように 'body'パラメータを含めようとします:
def processing_function(channel, body):
私はというエラーを取得:
[function_name] is missing 1 positional argument: 'body'
は、私がスレッドを使用する際に必要な多くのコードがあることを知って、あなたは私がやっているものを見ることができるように、私は以下のスレッドに使用する実際のコードが含まれています:
...imports and mq variables and pika connection details are set here...
def get_heartbeats(channel):
channel.queue_declare(queue=queue1)
#print (' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
process_body(body)
#print (" Received %s" % (body))
channel.basic_consume(callback, queue=queue1, no_ack=no_ack)
channel.start_consuming()
def process_body(channel, body):
channel.queue_declare(queue=queue2)
#print (' [*] Waiting for Tick messages. To exit press CTRL+C')
# sets the mq host which pika client will use to send a message to
connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host))
# create a channel connection instance
channel = connection.channel()
# declare a queue to be used by the channel connection instance
channel.queue_declare(queue=order_send_queue)
# send a message via the above channel connection settings
channel.basic_publish(exchange='', routing_key=send_queue, body='Test Message')
# send a message via the above channel settings
# close the channel connection instance
connection.close()
def manager():
# Channel 1 Connection Details - =======================================================================================
credentials = pika.PlainCredentials(mq_user_name, mq_password)
connection1 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
channel1 = connection1.channel()
# Channel 1 thread =====================================================================================================
t1 = threading.Thread(target=get_heartbeats, args=(channel1,))
t1.daemon = True
threads.append(t1)
# as this is thread 1 call to start threading is made at start threading section
# Channel 2 Connection Details - =======================================================================================
credentials = pika.PlainCredentials(mq_user_name, mq_password)
connection2 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
channel2 = connection2.channel()
# Channel 2 thread ====================================================================================================
t2 = threading.Thread(target=process_body, args=(channel2, body))
t2.daemon = True
threads.append(t2)
t2.start() # as this is thread 2 - we need to start the thread here
# Start threading
t1.start() # start the first thread - other threads will self start as they call t1.start() in their code block
for t in threads: # for all the threads defined
t.join() # join defined threads
manager() # run the manager module which starts threads that call each module
この実行がエラー
process_body() missing 1 required positional argument: (body)
を生成し、これがあるか、それを修正する方法を、なぜ私は理解していません。
この質問を読んでいただきありがとうございます。ご提供いただけるヘルプやアドバイスは大変ありがとうございます。
私はPythonの新機能であり、コード化されているので、もっと謎めいた返事を理解するのではなく、綴りが必要な場合があります。
ありがとうございます!
def process_body(channel, body):
が
def process_body(body):
と
t2 = threading.Thread(target=process_body, args=(channel2, body))
ように読むために:さらに、このに見て、私が行を編集している場合と思われるコードで遊んで