2017-07-31 15 views
1

pikaとスレッドモジュールを使用するPython 3.6スクリプトを開発しようとしています。Python - 別のスレッドで実行される関数間で関数(コールバック)変数を渡す

私はA)はPythonとPythonにはまったく新しく、B)別々のスレッドで実行され、すでに渡されているときに関数間で変数を渡す方法を理解していないという問題があります。受信関数名の最後のカッコ内のパラメータ。

私はこれを考えている理由は、スレッディングを使用しないと、受信関数名を呼び出すだけで関数間で変数を渡すことができるため、渡す変数を括弧で括って、基本的な例を示します以下:

def send_variable(): 
    body = "this is a text string" 
    receive_variable(body) 

def receive_variable(body): 
    print(body) 

この実行、版画:

this is a text string 

私はスレッドでの作業を取得するために必要なコードの作業バージョンを以下に示している - これはストレートの機能を使用しています(ノースレッド)とI pikaを使って(RabbitMQ)qからメッセージを受信して​​いますしかし私は、スクリプト内で実行するために、これを翻訳したい。これは、正常に動作します

import pika 
...mq connection variables set here... 


# defines username and password credentials as variables set at the top of this script 
    credentials = pika.PlainCredentials(mq_user_name, mq_pass_word) 

# defines mq server host, port and user credentials and creates a connection 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials)) 

# creates a channel connection instance using the above settings 
    channel = connection.channel() 

# defines the queue name to be used with the above channel connection instance 
    channel.queue_declare(queue=mq_queue) 


def callback(ch, method, properties, body): 

# passes (body) to processing function 
    body_processing(body) 

# sets channel consume type, also sets queue name/message acknowledge settings based on variables set at top of script 
    channel.basic_consume(callback, queue=mq_queue, no_ack=mq_no_ack) 
# tells the callback function to start consuming 
    channel.start_consuming() 
# calls the callback function to start receiving messages from mq server 
    callback() 
# above deals with pika connection and the main callback function 


def body_processing(body): 
    ...code to send a pika message every time a 'body' message is received... 

:ナキウサギコールバック関数を介したueueは、私は、「処理機能」に「コールバック」関数で受信したメッセージのボディを渡しますスレッドを使用します。私はこれを行うときに、自分自身のスレッドで実行される関数名にパラメータ 'channel'を指定する必要があります。次に 'processing_function'が以下のようになるように 'body'パラメータを含めようとします:

def processing_function(channel, body): 

私はというエラーを取得:

[function_name] is missing 1 positional argument: 'body' 

は、私がスレッドを使用する際に必要な多くのコードがあることを知って、あなたは私がやっているものを見ることができるように、私は以下のスレッドに使用する実際のコードが含まれています:

...imports and mq variables and pika connection details are set here... 

def get_heartbeats(channel): 
channel.queue_declare(queue=queue1) 
#print (' [*] Waiting for messages. To exit press CTRL+C') 

    def callback(ch, method, properties, body): 

     process_body(body) 

     #print (" Received %s" % (body)) 

    channel.basic_consume(callback, queue=queue1, no_ack=no_ack) 
    channel.start_consuming() 


def process_body(channel, body): 
    channel.queue_declare(queue=queue2) 
    #print (' [*] Waiting for Tick messages. To exit press CTRL+C') 

# sets the mq host which pika client will use to send a message to 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host)) 
# create a channel connection instance 
    channel = connection.channel() 
# declare a queue to be used by the channel connection instance 
    channel.queue_declare(queue=order_send_queue) 
# send a message via the above channel connection settings 
    channel.basic_publish(exchange='', routing_key=send_queue, body='Test Message') 
# send a message via the above channel settings 
# close the channel connection instance 
    connection.close() 


def manager(): 

# Channel 1 Connection Details - ======================================================================================= 

    credentials = pika.PlainCredentials(mq_user_name, mq_password) 
    connection1 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials)) 
    channel1 = connection1.channel() 

# Channel 1 thread ===================================================================================================== 
    t1 = threading.Thread(target=get_heartbeats, args=(channel1,)) 
    t1.daemon = True 
    threads.append(t1) 
    # as this is thread 1 call to start threading is made at start threading section 

# Channel 2 Connection Details - ======================================================================================= 

    credentials = pika.PlainCredentials(mq_user_name, mq_password) 
    connection2 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials)) 
    channel2 = connection2.channel() 

# Channel 2 thread ==================================================================================================== 
    t2 = threading.Thread(target=process_body, args=(channel2, body)) 
    t2.daemon = True 
    threads.append(t2) 
    t2.start() # as this is thread 2 - we need to start the thread here 

# Start threading 
t1.start() # start the first thread - other threads will self start as they call t1.start() in their code block 
for t in threads: # for all the threads defined 
    t.join() # join defined threads 

manager() # run the manager module which starts threads that call each module 

この実行がエラー

process_body() missing 1 required positional argument: (body) 

を生成し、これがあるか、それを修正する方法を、なぜ私は理解していません。

この質問を読んでいただきありがとうございます。ご提供いただけるヘルプやアドバイスは大変ありがとうございます。

私はPythonの新機能であり、コード化されているので、もっと謎めいた返事を理解するのではなく、綴りが必要な場合があります。

ありがとうございます!

def process_body(channel, body): 

def process_body(body): 

t2 = threading.Thread(target=process_body, args=(channel2, body)) 

ように読むために:さらに、このに見て、私が行を編集している場合と思われるコードで遊んで

答えて

1

それは読む:

t2 = threading.Thread(target=process_body) 

コードが必要に応じて動作するようです - htopで複数のスクリプトプロセスも見られるので、スレッド処理が動作しているように見えます - スクリプト処理を24時間以上残してエラーは発生しませんでした...

関連する問題