私はPython 2.7(sigh)、celery == 3.1.19、librabbitmq == 1.6.1、rabbitmq-server-3.5.6-1.noarch、およびredis 2.8.24(redis-cli infoから)を使用しています。 。セロリ、RabbitMQ、Redis:セロリメッセージは交換に入りますが、キューには入りませんか?
セロリの生産者からセロリの消費者にメッセージを送信し、その結果を生産者に戻そうとしています。 1人のプロデューサーと1人の消費者がいますが、2人のrabbitmq(ブローカーとして)と1人の赤ちゃん(結果用)が間にあります。
私が直面してる問題がある:消費者で
- 、私はasync_result.ready()は決して Trueを返しasync_result = ZipUp.delay(unique_directory)を介してAsyncResultを取得取り戻すが、(少なくとも9秒間は実行されません)。 コンシューマタスクの場合でも、基本的には文字列を返しません。
- 私は、rabbitmq管理Webインターフェイスで、私のメッセージ がrabbitmq交換で受信されているのを確認できますが、対応するrabbitmqキュー には表示されません。また、 によって送信されたログメッセージは、ZipUpタスクの開始時点では、 が記録されているようには見えません。
私はAsyncResultから結果を返そうとしないとうまくいきます!しかし、私はちょっと呼び出しの結果を得ることを望んでいます - それは便利です:)。
以下は構成の詳細です。
リターンのために、次のように我々はセロリを設定している:
CELERY_RESULT_BACKEND = 'redis://%s' % _SHARED_WRITE_CACHE_HOST_INTERNAL
CELERY_RESULT = Celery('TEST', broker=CELERY_BROKER)
CELERY_RESULT.conf.update(
BROKER_HEARTBEAT=60,
CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,
CELERY_TASK_RESULT_EXPIRES=100,
CELERY_IGNORE_RESULT=False,
CELERY_RESULT_PERSISTENT=False,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
)
は、私たちは、戻り値を期待していない別のセロリの構成を有していて、それが動作します - 同じプログラムで。
@CELERY_RESULT.task(name='ZipUp', exchange='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION)
def ZipUp(directory): # pylint: disable=invalid-name
""" Task stub """
_unused_directory = directory
raise NotImplementedError
このスタブに= =の代わりに為替のキューを使用すると、単純であろうと言及されています:セロリプロデューサーのスタブがどのように見える
CELERY = Celery('TEST', broker=CELERY_BROKER)
CELERY.conf.update(
BROKER_HEARTBEAT=60,
CELERY_RESULT_BACKEND=CELERY_BROKER,
CELERY_TASK_RESULT_EXPIRES=100,
CELERY_STORE_ERRORS_EVEN_IF_IGNORED=False,
CELERY_IGNORE_RESULT=True,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
)
:ように見えます。誰でもそれを確認できますか(私はグーグルではありますが、そのトピックについては全く何も見つかりませんでした)?どうやら、キューアウトを使うことはできますが、ファンアウトやそれに類するものを使用したいのでない限り、すべてのセロリーバックエンドに交換の概念があるわけではありません。
とにかく、セロリの消費者は、で始まり:
@task(queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION, name='ZipUp')
@StatsInstrument('workflow.ZipUp')
def ZipUp(directory): # pylint: disable=invalid-name
'''
Zip all files in directory, password protected, and return the pathname of the new zip archive.
:param directory Directory to zip
'''
try:
LOGGER.info('zipping up {}'.format(directory))
しかし、「アップビュンと、」どこにも記録されません。私はその文字列のセロリサーバー上のすべての(ディスクバックアップされた)ファイルを検索し、2つのヒットを得ました:/ usr/bin/zip、そしてセロリタスクのコード - そしてログメッセージはありません。
提案がありますか?
読んでいただきありがとうございます!プロデューサーで次のタスクスタブを使用したことが表示されます