2016-04-07 21 views
0

私はひどく捻挫してしまいました。私は本当に一つのことが必要です - 同じ長さから始まる任意の数の関数を実行し、すべての結果を集めて何らかの処理を行います。ここでPython Twisted - 複数のコールバックを実行

は私が持っているものです。

from twisted.internet import defer 
import time 


# slow computing query 
def process_data(num, data): 
    time.sleep(5) 
    array = [] 
    # mock the results obtained from processed data 
    for i in range(0, 5): 
     array.append(num) 
    return array 

def process_results(arrays): 
    # this should collect return arrays of all callbacks 
    print arrays 

data = [] 
callbacks_refs = [] 
for i in range(0, 5): 
    d=defer.Deferred() 
    d.addCallback(process_data) 
    callbacks_refs.append(d) 

callbacks = defer.DeferredList(callbacks_refs) 
callbacks.addCallback(process_results) 

for i, d in enumerate(callbacks_refs): 
    d.callback(i, data) 

私はループの最後には、(通常の約束と同じように)非同期ですべてのコールバックの実行を開始し、すべての結果がその機能をprocess_resultsに渡されることを期待していましたcallbacks_refsからのすべてのコールバックが完了した後に実行されますが、私はそれにひどく間違っていると感じます。

答えて

2

あなたのサンプルがあなたの実際のコードにどれほど近いか分かりませんが、サンプルコードはTwistedが何をしているのか誤解しています。 Twistedはあなたの同期コードを魔法のように非同期にしません。イベントループをブロックするのはtime.sleepです。 CPUバインド(I/Oバインドとは対照的に)を行う場合は、複数のスレッドまたはプロセスを使用できます。

私はprocess_dataがブロッキング呼び出しであると仮定して、あなたのマルチスレッドに基づいたソリューションを提供します:

import time 
from twisted.internet import defer, task, threads 

# slow computing query 
def process_data(num): 
    time.sleep(5) 
    array = [] 
    # mock the results obtained from processed data 
    for i in range(0, 5): 
     array.append(num) 
    return array 

def process_results(arrays): 
    # this should collect return arrays of all callbacks 
    print arrays 

def main(_): 
    callbacks_refs = [] 
    for i in range(0, 5): 
     callbacks_refs.append(threads.deferToThread(process_data, i)) 
    callbacks = defer.DeferredList(callbacks_refs) 
    callbacks.addCallback(process_results) 
    return callbacks 

task.react(main) 

私はまた、あなたのツイストプログラミングについて1一般的なアドバイスを与えるだろう - あなた自身がd = defer.Deferred()を入力して検索する場合は、何かをデザインに間違いがある可能性があります。

0

回避策があるかどうかわかりませんが、defer.callback()の方法では、間違ったパラメータがコールバックに渡されています。

コールバックとともにエラーバックを添付した場合、失敗した結果の読み込み中ですが、正常に動作していますが、期待どおりに動作していない可能性があります。

2つの修正があります。

from functools import partial 
for i in range(0, 5): 
    d=defer.Deferred() 
    d.addCallback(partial(process_data,i,data[i])) 
    # This partial is still kinda crooked, but hopefully I have made my point 
    callbacks_refs.append(d) 

か、あなたのコールバック関数

# slow computing query 
def process_data(data_dict): 
    #data_dict['num'] 
    #data_dict['data'] 

#...and further down 
d.callback({'num':4,'data':(1,2,3)}) 

にデータを渡す方法を変更私は申し訳ありませんが、私はdeferredlistにそれほど慣れていないけど、私はあなたがDeferredの作り方を修正したら、deferredlistがかもしれないと思います自動的に作業します。

関連する問題