2016-07-28 16 views
0

私はPython 2.7(sigh)、celery == 3.1.19、librabbitmq == 1.6.1、rabbitmq-server-3.5.6-1.noarch、およびredis 2.8.24(redis-cli infoから)を使用しています。 。セロリ、RabbitMQ、Redis:セロリメッセージは交換に入りますが、キューには入りませんか?

セロリの生産者からセロリの消費者にメッセージを送信し、その結果を生産者に戻そうとしています。 1人のプロデューサーと1人の消費者がいますが、2人のrabbitmq(ブローカーとして)と1人の赤ちゃん(結果用)が間にあります。

私が直面してる問題がある:消費者で

  1. 、私はasync_result.ready()は決して Trueを返しasync_result = ZipUp.delay(unique_directory)を介してAsyncResultを取得取り戻すが、(少なくとも9秒間は実行されません)。 コンシューマタスクの場合でも、基本的には文字列を返しません。
  2. 私は、rabbitmq管理Webインターフェイスで、私のメッセージ がrabbitmq交換で受信されているのを確認できますが、対応するrabbitmqキュー には表示されません。また、 によって送信されたログメッセージは、ZipUpタスクの開始時点では、 が記録されているようには見えません。

私はAsyncResultから結果を返そうとしないとうまくいきます!しかし、私はちょっと呼び出しの結果を得ることを望んでいます - それは便利です:)。

以下は構成の詳細です。

リターンのために、次のように我々はセロリを設定している:

CELERY_RESULT_BACKEND = 'redis://%s' % _SHARED_WRITE_CACHE_HOST_INTERNAL 
CELERY_RESULT = Celery('TEST', broker=CELERY_BROKER) 
CELERY_RESULT.conf.update(
    BROKER_HEARTBEAT=60, 
    CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND, 
    CELERY_TASK_RESULT_EXPIRES=100, 
    CELERY_IGNORE_RESULT=False, 
    CELERY_RESULT_PERSISTENT=False, 
    CELERY_ACCEPT_CONTENT=['json'], 
    CELERY_TASK_SERIALIZER='json', 
    CELERY_RESULT_SERIALIZER='json', 
    ) 

は、私たちは、戻り値を期待していない別のセロリの構成を有していて、それが動作します - 同じプログラムで。

@CELERY_RESULT.task(name='ZipUp', exchange='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION) 
def ZipUp(directory): # pylint: disable=invalid-name 
    """ Task stub """ 
    _unused_directory = directory 
    raise NotImplementedError 

このスタブに= =の代わりに為替のキューを使用すると、単純であろうと言及されています:セロリプロデューサーのスタブがどのように見える

CELERY = Celery('TEST', broker=CELERY_BROKER) 
CELERY.conf.update(
    BROKER_HEARTBEAT=60, 
    CELERY_RESULT_BACKEND=CELERY_BROKER, 
    CELERY_TASK_RESULT_EXPIRES=100, 
    CELERY_STORE_ERRORS_EVEN_IF_IGNORED=False, 
    CELERY_IGNORE_RESULT=True, 
    CELERY_ACCEPT_CONTENT=['json'], 
    CELERY_TASK_SERIALIZER='json', 
    CELERY_RESULT_SERIALIZER='json', 
    ) 

:ように見えます。誰でもそれを確認できますか(私はグーグルではありますが、そのトピックについては全く何も見つかりませんでした)?どうやら、キューアウトを使うことはできますが、ファンアウトやそれに類するものを使用したいのでない限り、すべてのセロリーバックエンドに交換の概念があるわけではありません。

とにかく、セロリの消費者は、で始まり:

@task(queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION, name='ZipUp') 
@StatsInstrument('workflow.ZipUp') 
def ZipUp(directory): # pylint: disable=invalid-name 
    ''' 
    Zip all files in directory, password protected, and return the pathname of the new zip archive. 
    :param directory Directory to zip 
    ''' 
    try: 
     LOGGER.info('zipping up {}'.format(directory)) 

しかし、「アップビュンと、」どこにも記録されません。私はその文字列のセロリサーバー上のすべての(ディスクバックアップされた)ファイルを検索し、2つのヒットを得ました:/ usr/bin/zip、そしてセロリタスクのコード - そしてログメッセージはありません。

提案がありますか?

読んでいただきありがとうございます!プロデューサーで次のタスクスタブを使用したことが表示されます

答えて

0

は、問題を解決:要するに

@CELERY_RESULT.task(name='ZipUp', queue='cognition.workflow.ZipUp_%s' % INTERNAL_VERSION) 
def ZipUp(directory): # pylint: disable=invalid-name 
    """ Task stub """ 
    _unused_directory = directory 
    raise NotImplementedError 

、それは=の代わりに交換=のキューを使用しています。

関連する問題