2017-05-18 18 views
0

セロリのビートを使って定期的なタスクを設定しました。タスクが実行され、結果がコンソールに表示されます。 タスクによってスローされた結果を再収集するpythonスクリプトが必要です。セロリのタスクIDの取得方法

私はこのようにそれを行うことができます:

#client.py 
from cfg_celery import app 
task_id = '337fef7e-68a6-47b3-a16f-1015be50b0bc' 
try: 
    x = app.AsyncResult(id) 
    print(x.get()) 
except: 
    print('some error') 

をとにかく、あなたが見ることができるように、このテストのために私はセロリビートコンソール(そう言うこと)で投げtask_idをコピーし、それをハードコーディングしなければならなかった私のスクリプト。明らかに、これは実際の生産では機能しません。

私はそれはセロリの設定ファイルにtask_idを設定ハッキング:このアプローチの問題は、私が以前を失うということです

#client.py 
from cfg_celery import app 
task_id = 'my_custom_id' 
try: 
    x = app.AsyncResult(id) 
    print(x.get()) 
except: 
    print('some error') 

#cfg_celery.py 
app = Celery('celery_config', 
     broker='redis://localhost:6379/0', 
     include=['taskos'], 
     backend = 'redis' 
     ) 
app.conf.beat_schedule = { 
    'something': { 
     'task': 'tasks.add', 
     'schedule': 10.0, 
     'args': (16, 54), 
     'options' : {'task_id':"my_custom_id"}, 
    } 
} 

私はこのようにそれを読むことができますこの方法結果(client.pyの前)

私はセロリのバックエンドでtask_id年代のリストを読み込むことができますいくつかの方法はありますか? 複数の定期的なタスクがある場合、各定期タスクからtask_idのリストを取得できますか? これを行うにはapp.tasks.key()を使用できますか?

PD:私は間違っているいくつかの用語を使用した場合、英語圏のネイティブを、プラスセロリに新しいものではない、素晴らしいこと。

+0

私は(私は他のpythonのインスタンスから結果をつかむために)私が欲しいものを得るために私は値を格納し、取得するためにredis関数を使用する必要があることになった。私はzaddを使用して結果をredisに書き出し、 'client.py'にzrangeを使用してそれらを取得します。 –

答えて

0

OK。私の質問があまりにも愚かであるために誰もこれに答えなかったかどうかは分かりません。 とにかく、私がしたいのは、私の「セロリビート」タスクの結果を別のpythonプロセスから取得することです。 同じプロセスになっていて問題はありませんでした。私はタスクIDにアクセスできました。そこからすべてが簡単でした。しかし、他のプロセスからは、完成したタスクのリストを取得する方法が見つかりませんでした。

私のpython-RQを試してみました(それはいいです)が、私はどちらか私は手動でRedisのストレージ機能を利用するために持っていたことを理解するようになったことを行うことができませんでしたRQを使用していることを見たとき。

:だから私はこれをやって、私が何を望むかです。タスク関数内から逆解析できるようにするには、 'bind = True'を使用します。 。関数の結果が得られたら、私はredisのリストに書きます(私はこのリストのサイズを制限するトリックを作った) 。今では、独立したプロセスから同じRedisサーバーに接続し、そのようなリストに格納された結果を取得できます。

cfg_celery.py:

私のファイルは、次のようになってしまった、ここで私は、タスクが呼び出されることしようとしている方法を定義します。

#cfg_celery.py 
from celery import Celery 

appo = Celery('celery_config', 
     broker='redis://localhost:6379/0', 
     include=['taskos'], 
     backend = 'redis' 
     ) 

''' 
urlea se decoro como periodic_task. no hay necesidad de darla de alta aqi. 
pero como add necesita args, la doy de alta manualmente p pasarselos 
''' 
appo.conf.beat_schedule = { 
    'q_loco': { 
     'task': 'taskos.add', 
     'schedule': 10.0, 
     'args': (16, 54), 
     # 'options' : {'task_id':"lcura"}, 
    } 
} 

taskos.py:これらはタスクです。

#taskos.py 
from cfg_celery import appo 
from celery.decorators import periodic_task 
from redis import Redis 

from datetime import timedelta 
import requests, time 

rds = Redis() 

@appo.task(bind=True) 
def add(self,a, b): 
    #result of operation. very dummy. 
    result = a + b 

    #storing in redis 
    r= (self.request.id,time.time(),result) 
    rds.lpush('my_results',r) 

    # for this test i want to have at most 5 results stored in redis 
    long = rds.llen('my_results') 
    while long > 5: 
     x = rds.rpop('my_results') 
     print('popping out',x) 
     long = rds.llen('my_results') 
     time.sleep(1) 
    return a + b 


@periodic_task(run_every=20) 
def urlea(url='https://www.fullstackpython.com/'): 
    inicio = time.time() 
    R = dict() 
    try: 
     resp = requests.get(url) 
     R['vato'] = url+" = " + str(resp.status_code*10) 
     R['num palabras'] = len(resp.text.split()) 
    except: 
     R['vato'] = None 
     R['num palabras'] = 0   
    print('u {} : {}'.format(url,time.time()-inicio)) 
    time.sleep(0.8) # truco pq se vea mas claramente la dif. 
    return R 

consumer.py:結果を得るための独立したプロセス。

#consumer.py 
from redis import Redis 
nombre_lista = 'my_results' 

rds = Redis() 

tamaño = rds.llen(nombre_lista) 
ultimos_resultados = list() 
for i in range(tamaño): 
    ultimos_resultados.append(rds.rpop(nombre_lista)) 

print(ultimos_resultados) 

私はプログラミングには比較的新しいので、私はこの回答が私のようなnoobsに役立つことを願っています。私が何か間違っていたら、必要に応じて訂正をしてください。

関連する問題