2017-02-13 10 views
1

私は、比較的複雑なセロリチェーン構成に固執しており、以下を達成しようとしています。次のようなタスクのチェーンがありますと仮定しますセロリチェーンでグループ結果を使用する

chain1 = chain(
    DownloadFile.s("http://someserver/file.gz"), # downloads file, returns temp file name 
    UnpackFile.s(), # unpacks the gzip comp'd file, returns temp file name 
    ParseFile.s(), # parses file, returns list URLs to download 
) 

今、私は私がやったことだった、並列に各URLをダウンロードします:

urls = chain1.get() 
download_tasks = map(lambda x: DownloadFile.s(x), urls) 
res1 = celery.group(download_tasks)() 
res1_data = res1.get() 

は最後に、私は(それぞれダウンロードしたファイルを取りたいです

chains = [] 
for tmpfile in res: 
    chains.append(celery.chain(
     foo.s(tmpfile), 
     bar.s(), 
     baz.s() 
    )) 

res2 = celery.group(*chains)() 
res2_data = res2.get() 
:一時ファイル名) DownloadFileから返さ ParseFileから返されると並行して、タスクの別の鎖を介して実行される(例えば、それは group chainのS)となります

私はchain1の結果を待つことができるので、通常のPythonプロセス(別のセロリタスクではありません)で実行した場合、アプローチはうまく動作し、ダウンロードタスクグループと新しいダウンロードチェーンごとのチェーンを構築します。

しかし、今私は別の@app.task飾ら機能でそれをラップすることにより、別のセロリタスクにすべてのこのようなものをラップしたい、そしてそれはあなたが呼び出すことはできません(または、実際に内部から.get()を呼び出すべきではありませんが判明他のタスクが完了するのを待ちます)、このワークフローをタスク内で実行するための「移植」ソリューションを見つけることができませんでした。私はchain1チェーンにres1を追加しようとしましたが、セロリーは<GroupResult: ..... > is not JSON serializableという文句を言います。

誰かがそれを動作させる方法を提案できますか?ありがとう!

+0

あなたfoo_bar_baz処理の結果によって興味がありますか?つまり、処理が完了したことを知るだけで十分です(たとえば、ファイルを消去してどこかに保存するなど) – arthur

答えて

1

実際に.get()をタスク内で呼び出すのは悪いです。目標Celeryは、非同期タスクを並行して実行することで、結果を待つべきではありません。

問題を解決する1つの方法は、最初の処理(ファイルまたはデータベースのいずれか)のURL結果を保存することです。

私は、結果をファイルに書き込むことでできることの短い例を書いています。私はjsonダンピングを選んだ。

mainurlsのリストがあるとします。最初に、groupchainのすべてのURLを非同期処理で起動します。 これらのタスクはすべて、urlsを処理し、指定されたtmpディレクトリにあるファイルにダウンロードするURLのリストを格納します。

また、ファイルが書き込まれているかどうかをチェックするこのタスクでは、すべてのファイルを処理し、対応するファイルをtmpディレクトリで削除します。

私が選んだパラメータでは、このタスクは30秒ごとに自動再試行され、決して終了しません(私はあなたには復活する仕事があると思います)ので、これを変更するかもしれませんが、

私はそれをmainとして実行しましたが、必要に応じて別のセロリのタスクにラップすることもできます。

app_module.py

from __future__ import absolute_import 
from celery import Celery 

app = Celery('app') 
app.config_from_object("settings") 

if __name__ == '__main__': 
    app.start() 

タスク。PY

from celery import group, chain 
from app_module import app 

import json 
import glob 
import os 

__all__ = ('download_file', 
       'unpack_file', 
       'parse_file', 
       'foo', 
       'bar', 
       'process_downloaded_file', 
       'check_dir',) 

path = "./data/tmp_dir/" 

@app.task 
def download_file(filename): 
    return filename 

@app.task 
def unpack_file(filename): 
    return "unzipped_" + filename 

@app.task 
def parse_file(filename): 
    # Fake parse task storing results in a temp directory 
    # results are stored in a json and contains the list of urls 
    with open(path + filename, "wb") as f: 
     d = {"files" : [filename+"_" + str(i) for i in range(0,5)]} 
     json.dump(d, f) 
    return True 

@app.task 
def foo(filename): 
    return "foo_" + filename 

@app.task 
def bar(filename): 
    return "bar_" + filename 


@app.task 
def process_downloaded_file(filename): 
    #process one file in the temp directory and at the end delete the file so it 
    # is not processed several times 
    with open(filename, "rb") as f: 
     d = json.load(f) 
    g = group(chain(download_file.s(f), foo.s(), bar.s()) for f in d["files"]).apply_async() 
    os.remove(filename) 
    return True 

@app.task(bind=True) 
def check_dir(self, tmp_dir, sleep=30): 
    #this task checks the tmp directory. If files have been written it processes 
    #every file in the directory. The task autoretries each *sleep* seconds 
    for f in glob.glob(tmp_dir + "*"): 
     process_downloaded_file.delay(f) 
    self.retry(args=(tmp_dir, sleep), countdown=sleep) 

main.py

from celery import group, chain 
from tasks import * 

path = "./data/tmp_dir/" 
urls = ["file1", "file2"] 
group(chain(download_file.s(f), unpack_file.s(), parse_file.s()) for f in urls).apply_async() 
check_dir.delay(path) 

コンソール出力:

[2017-02-14 18:10:41,630: INFO/MainProcess] Received task: arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2] 
[2017-02-14 18:10:41,632: INFO/MainProcess] Received task: arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827] 
[2017-02-14 18:10:41,637: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] 
[2017-02-14 18:10:41,666: INFO/MainProcess] Received task: arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805] 
[2017-02-14 18:10:41,674: INFO/MainProcess] Task arthur.tasks.download_file[65cb06c6-b8b6-4108-af36-84103037e4a2] succeeded in 0.0389260330703s: u'file1' 
[2017-02-14 18:10:41,682: INFO/MainProcess] Received task: arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e] 
[2017-02-14 18:10:41,689: INFO/MainProcess] Task arthur.tasks.download_file[d069e046-4153-4320-8f9d-a22adeeb2827] succeeded in 0.0534016339807s: u'file2' 
[2017-02-14 18:10:41,691: INFO/MainProcess] Received task: arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7] 
[2017-02-14 18:10:41,696: INFO/MainProcess] Task arthur.tasks.unpack_file[47b13b21-57e2-44be-82dd-f8e0e1adff2e] succeeded in 0.00816849502735s: u'unzipped_file2' 
[2017-02-14 18:10:41,704: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207] 
[2017-02-14 18:10:41,706: INFO/MainProcess] Task arthur.tasks.parse_file[bd3fa287-9cf0-4802-88ca-2593c27af4f7] succeeded in 0.00894999306183s: True 
[2017-02-14 18:10:41,708: INFO/MainProcess] Task arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] retry: Retry in 30s 
[2017-02-14 18:10:41,709: INFO/MainProcess] Received task: arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478] 
[2017-02-14 18:10:41,713: INFO/MainProcess] Task arthur.tasks.unpack_file[e9eab102-8ae0-4000-b384-5cfa0e01e805] succeeded in 0.044072615914s: u'unzipped_file1' 
[2017-02-14 18:10:41,714: INFO/MainProcess] Received task: arthur.tasks.check_dir[19a1984f-c2ed-4de6-82d6-b5ad5a6bacc5] eta:[2017-02-14 17:11:11.692241+00:00] 
[2017-02-14 18:10:41,717: INFO/MainProcess] Received task: arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f] 
[2017-02-14 18:10:41,720: INFO/MainProcess] Received task: arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104] 
[2017-02-14 18:10:41,724: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1b72f409-f5b5-480a-b651-616dc02b2207] succeeded in 0.0153999190079s: True 
[2017-02-14 18:10:41,725: INFO/MainProcess] Task arthur.tasks.parse_file[e839826a-dfa5-4df0-a716-9c21371c297f] succeeded in 0.00395095907152s: True 
[2017-02-14 18:10:41,726: INFO/MainProcess] Task arthur.tasks.download_file[743153f9-9c92-430e-84f5-7d99a269c104] succeeded in 0.00449692492839s: u'unzipped_file1_0' 
[2017-02-14 18:10:41,727: INFO/MainProcess] Received task: arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea] 
[2017-02-14 18:10:41,728: INFO/MainProcess] Task arthur.tasks.process_downloaded_file[1d8d340f-61f7-4ef3-a90e-913a3bfb5478] succeeded in 0.0129376259865s: True 
[2017-02-14 18:10:41,729: INFO/MainProcess] Received task: arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1] 
[2017-02-14 18:10:41,731: INFO/MainProcess] Received task: arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed] 
[2017-02-14 18:10:41,733: INFO/MainProcess] Task arthur.tasks.download_file[a29470d7-85a0-4a91-a410-2e51cff81cea] succeeded in 0.003385586082s: u'unzipped_file1_1' 
[2017-02-14 18:10:41,734: INFO/MainProcess] Task arthur.tasks.download_file[44a1cc48-52a4-4548-a862-48d402dd92f1] succeeded in 0.00395720102824s: u'unzipped_file1_2' 
[2017-02-14 18:10:41,735: INFO/MainProcess] Received task: arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce] 
[2017-02-14 18:10:41,739: INFO/MainProcess] Task arthur.tasks.download_file[d93a7260-43dc-4e77-b5ff-ce0e3bc426ce] succeeded in 0.00272180500906s: u'unzipped_file1_4' 
[2017-02-14 18:10:41,740: INFO/MainProcess] Task arthur.tasks.download_file[cedf91b2-6e3f-48c3-880e-b80a1c38efed] succeeded in 0.00340146606322s: u'unzipped_file1_3' 
[2017-02-14 18:10:41,740: INFO/MainProcess] Received task: arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad] 
[2017-02-14 18:10:41,742: INFO/MainProcess] Received task: arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0] 
[2017-02-14 18:10:41,745: INFO/MainProcess] Received task: arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655] 
[2017-02-14 18:10:41,747: INFO/MainProcess] Task arthur.tasks.download_file[f1b19d02-a97d-4e32-afde-e39d46d45bad] succeeded in 0.00358341098763s: u'unzipped_file2_0' 
[2017-02-14 18:10:41,748: INFO/MainProcess] Task arthur.tasks.download_file[4a0bce55-8662-42a6-a19d-3ff33496d7e0] succeeded in 0.0044348789379s: u'unzipped_file2_1' 
[2017-02-14 18:10:41,749: INFO/MainProcess] Received task: arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd] 
[2017-02-14 18:10:41,752: INFO/MainProcess] Received task: arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d] 
[2017-02-14 18:10:41,754: INFO/MainProcess] Task arthur.tasks.download_file[a759d6a1-a558-46ba-8ee1-2cb28cbe0655] succeeded in 0.00349929102231s: u'unzipped_file2_2' 
[2017-02-14 18:10:41,755: INFO/MainProcess] Task arthur.tasks.foo[e3250c36-92e9-4f53-afef-fe95b035e0dd] succeeded in 0.00417044304777s: u'foo_unzipped_file1_0' 
[2017-02-14 18:10:41,755: INFO/MainProcess] Received task: arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c] 
[2017-02-14 18:10:41,757: INFO/MainProcess] Received task: arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf] 
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[3e9db0d1-31c5-4703-8e9d-c2b9f4237d8d] succeeded in 0.00325334002264s: u'unzipped_file2_3' 
[2017-02-14 18:10:41,760: INFO/MainProcess] Task arthur.tasks.download_file[dcda209f-f4be-4697-84c1-e55a8502a45c] succeeded in 0.00384710694198s: u'unzipped_file2_4' 
[2017-02-14 18:10:41,761: INFO/MainProcess] Received task: arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc] 
[2017-02-14 18:10:41,764: INFO/MainProcess] Received task: arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc] 
[2017-02-14 18:10:41,765: INFO/MainProcess] Task arthur.tasks.foo[3e9db173-7200-4c46-aade-72be5553b0cf] succeeded in 0.00316555600148s: u'foo_unzipped_file1_1' 
[2017-02-14 18:10:41,766: INFO/MainProcess] Task arthur.tasks.foo[d936cddc-027c-4640-8a0b-26a7d9723ccc] succeeded in 0.00383736204822s: u'foo_unzipped_file1_2' 
[2017-02-14 18:10:41,767: INFO/MainProcess] Received task: arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714] 
[2017-02-14 18:10:41,769: INFO/MainProcess] Received task: arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c] 
[2017-02-14 18:10:41,771: INFO/MainProcess] Task arthur.tasks.foo[11ae4aef-1af9-43a0-94b8-7b95575cd1bc] succeeded in 0.00347809505183s: u'foo_unzipped_file1_3' 
[2017-02-14 18:10:41,772: INFO/MainProcess] Task arthur.tasks.foo[9e60adad-57e2-4a6e-874d-c687df189714] succeeded in 0.00403305899817s: u'foo_unzipped_file1_4' 
[2017-02-14 18:10:41,773: INFO/MainProcess] Received task: arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f] 
[2017-02-14 18:10:41,775: INFO/MainProcess] Received task: arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e] 
[2017-02-14 18:10:41,777: INFO/MainProcess] Task arthur.tasks.foo[8b1eebb8-abb0-4223-872c-e9687031380c] succeeded in 0.00311726506334s: u'foo_unzipped_file2_0' 
[2017-02-14 18:10:41,778: INFO/MainProcess] Task arthur.tasks.foo[f9c137d7-4087-4519-919d-62bba457747f] succeeded in 0.00378636294045s: u'foo_unzipped_file2_1' 
[2017-02-14 18:10:41,778: INFO/MainProcess] Received task: arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78] 
[2017-02-14 18:10:41,780: INFO/MainProcess] Received task: arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23] 
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.foo[2a43d460-aceb-465e-8be5-678cb930a60e] succeeded in 0.00324743904639s: u'foo_unzipped_file2_2' 
[2017-02-14 18:10:41,783: INFO/MainProcess] Task arthur.tasks.bar[770d4cd4-527c-4efe-975f-daf337934c78] succeeded in 0.00382692192215s: u'bar_foo_unzipped_file1_0' 
[2017-02-14 18:10:41,784: INFO/MainProcess] Received task: arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07] 
[2017-02-14 18:10:41,787: INFO/MainProcess] Received task: arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171] 
[2017-02-14 18:10:41,788: INFO/MainProcess] Task arthur.tasks.foo[c09677f9-183e-43ef-889c-c8b7cab2bd23] succeeded in 0.00343648903072s: u'foo_unzipped_file2_4' 
[2017-02-14 18:10:41,789: INFO/MainProcess] Task arthur.tasks.foo[1a6294da-8cae-4bf1-9d56-be5972254e07] succeeded in 0.00413183600176s: u'foo_unzipped_file2_3' 
[2017-02-14 18:10:41,790: INFO/MainProcess] Received task: arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947] 
[2017-02-14 18:10:41,792: INFO/MainProcess] Received task: arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512] 
[2017-02-14 18:10:41,794: INFO/MainProcess] Task arthur.tasks.bar[bd15721f-3bea-4c64-a0c4-59c5c8730171] succeeded in 0.0031840458978s: u'bar_foo_unzipped_file1_2' 
[2017-02-14 18:10:41,795: INFO/MainProcess] Task arthur.tasks.bar[29a982bd-2a72-49e7-bc56-0f2a4b2ba947] succeeded in 0.00374374503735s: u'bar_foo_unzipped_file1_1' 
[2017-02-14 18:10:41,796: INFO/MainProcess] Received task: arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8] 
[2017-02-14 18:10:41,798: INFO/MainProcess] Task arthur.tasks.bar[5944c49d-428d-4237-8777-edec76b36512] succeeded in 0.00241802399978s: u'bar_foo_unzipped_file1_4' 
[2017-02-14 18:10:41,798: INFO/MainProcess] Received task: arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d] 
[2017-02-14 18:10:41,801: INFO/MainProcess] Received task: arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e] 
[2017-02-14 18:10:41,803: INFO/MainProcess] Task arthur.tasks.bar[12272aad-f6e6-432e-945a-363a678ba2a8] succeeded in 0.00308170204517s: u'bar_foo_unzipped_file1_3' 
[2017-02-14 18:10:41,804: INFO/MainProcess] Task arthur.tasks.bar[493cc5cc-797b-40f3-87a7-1394af1ae45d] succeeded in 0.00375492009334s: u'bar_foo_unzipped_file2_0' 
[2017-02-14 18:10:41,804: INFO/MainProcess] Received task: arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f] 
[2017-02-14 18:10:41,807: INFO/MainProcess] Received task: arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d] 
[2017-02-14 18:10:41,808: INFO/MainProcess] Task arthur.tasks.bar[c4b4e9de-4ce7-476f-b275-278db3d8099f] succeeded in 0.00304232595954s: u'bar_foo_unzipped_file2_2' 
[2017-02-14 18:10:41,809: INFO/MainProcess] Task arthur.tasks.bar[e2925c2b-426d-4076-8a8c-c67c56a2ab8e] succeeded in 0.00377448496874s: u'bar_foo_unzipped_file2_1' 
[2017-02-14 18:10:41,810: INFO/MainProcess] Received task: arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2] 
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[b0cdb87c-292f-4f14-975c-c7bd4679373d] succeeded in 0.00181642104872s: u'bar_foo_unzipped_file2_4' 
[2017-02-14 18:10:41,813: INFO/MainProcess] Task arthur.tasks.bar[3570e196-7c41-43b1-b7ef-68b2d31f28a2] succeeded in 0.00239081599284s: u'bar_foo_unzipped_file2_3' 
+0

詳細な回答をありがとう、私は実際に似たようなことをしたので、私はそれを受け入れます。しかし、1つの問題は残っています。セロリは偶然、「グループ」内の「チェーン」が終了するのを待たずに結果を返します(グループはチェーンの実行予定があるとすぐに戻ります)。ファイルやデータベースに書き込むことなく、このシナリオで結果を得るうーん、簡単な方法はありません。 – madprogrammer

+0

確かにセロリはグループが終了するのを待っていません。結果を待つ必要がある場合は、セロリが必要なものではない可能性があります。 2番目のステップを開始する前に最初のステップを待つ必要があるため、シナリオに対処するための素晴らしいセロリワークフローはありません。 – arthur

+0

ですが、グループの素晴らしい機能は、保存して復元することです。例えば ​​'process_downloaded_file'というタスクで' g.save() 'と' print(g.id) 'を実行することができます。次に、タスクにないコードの別の部分で、 'g = app.GroupResult.restore(the_group_id)'と 'g.get()'を実行することができます。 – arthur

関連する問題