2016-03-25 22 views
2

私はウサギとセロリーには新しいので、私と一緒に裸にしてください。セロリとメッセージブローカリングをやってCloudAMQP RabbitMQのワーカーとDjangoプロジェクト - だからここ セロリ、RabbitMQメッセージが送られ続ける

セットアップです。

マイセロリ/ RabbitMQの設定:次のコマンドでセロリを実行している

# RabbitMQ & Celery settings 
BROKER_URL = 'ampq://guest:[email protected]:5672/' # Understandably fake 
BROKER_POOL_LIMIT = 1 
BROKER_CONNECTION_TIMEOUT = 30 
BROKER_HEARTBEAT = 30 
CELERY_SEND_EVENTS = False 
CELERY_ACCEPT_CONTENT = ['json'] 
CELERY_TASK_SERIALIZER = 'json' 

ドッキングウィンドウコンテナ:

bash -c 'cd django && celery -A pkm_main worker -E -l info --concurrency=3' 

shared_task定義:

from __future__ import absolute_import 

from celery import shared_task 

@shared_task 
def push_notification(user_id, message): 
    logging.critical('Push notifications sent') 
    return {'status': 'success'} 

そして、私が実際にそれを呼び出します何かが起こったとき(私はコードの一部を省略した関連する):

from notificatons.tasks import push_notification 

    def like_this(self, **args): 
    # Do like stuff and then do .delay() 
    push_notification.delay(media.user.id, request.user.username + ' has liked your item') 

だから、これはRANである - すべてが結構なようだ - 出力はそうのようになります。

worker_1 | [2016-03-25 09:03:34,888: INFO/MainProcess] Received task: notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516] 
worker_1 | [2016-03-25 09:03:35,333: CRITICAL/Worker-1] Push notifications sent 
worker_1 | [2016-03-25 09:03:35,336: INFO/MainProcess] Task notifications.tasks.push_notification[8443bd88-fa02-4ea4-9bff-8fbec8c91516] succeeded in 0.444933412999s: {'status': 'success'} 

だから私はタスクが実行し、正しく実行された集まる何から、メッセージを停止し、RabbitMQを停止する必要があります。

しかし、私RabbitMQの管理に私は、メッセージが公開され、ノンストップでお届け取得を参照してください:

RabbitMQ Admin GUI

それでは、私はこれから集まってることRabbitMQを、確認のいくつかの並べ替えを送信しようとして失敗しているということですと再試行?実際にこの動作をオフにする方法はありますか?

すべての助力とアドバイスは歓迎されます。

EDITは: - メッセージのタブが30秒ごとに来て、行くハートビート用のセーブ空である)私は(push_notification.delayに呼び出すまで重要な何かを言及し忘れました。私が.delay()を呼び出した後でのみ、これは起こります。

EDIT 2:CELERYBEAT_SCHEDULE設定(私は、彼らなしで実行しようとしました - そこには差はなかったが、念のためにそれらを追加する)

CELERYBEAT_SCHEDULE = { 
    "minutely_process_all_notifications": { 
     'task': 'transmissions.tasks.process_all_notifications', 
     'schedule': crontab(minute='*') 
    } 
} 

EDIT 3:追加したビューコード。また、私はCELERYBEAT_SCHEDULEを使用していません。私は将来のスケジュールされたタスクのコードに設定を保存しています。

from notifications.tasks import push_notification 

class MediaLikesView(BaseView): 
    def post(self, request, media_id): 
     media = self.get_object(media_id) 
     data = {} 
     data['media'] = media.id 
     data['user'] = request.user.id 
     serializer = MediaLikeSerializer(data=data) 
     if serializer.is_valid(): 
      like = serializer.save() 
      push_notification.delay(media.user.id, request.user.username + ' has liked your item') 
      serializer = MediaGetLikeSerializer(like) 
      return self.get_mocked_pagination_response(status=status.HTTP_204_NO_CONTENT) 
     return self.get_mocked_pagination_response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) 
+0

あなたは 'like_this'メソッドを呼び出す場所にコードを投稿してください。もしあれば 'CELERYBEAT_SCHEDULE'の値は? – trinchet

+0

ねえ。私はスケジュールを追加しました。このメソッド自体は、Django Rest Frameworkのビュークラスのpostメソッドの内部にあります。それが役に立つと思うかどうかは分かりませんが、私はそこにそれを投げることもできます。 –

+0

'process_all_notifications'メソッドはどこにありますか?つまり、「transmissions.tasks」にあるはずですが、このコードは表示されません。追加してもらえますか? – trinchet

答えて

1

セロリの混乱とゴシップです。コマンドライン引数に--without-gossip --without-mingle --without-heartbeatを追加して無効にします。

またそうでなければ、30代の後に切断さよ、あなたはコマンドライン上で無効にハートビートをしたらBROKER_HEARTBEAT = Noneを設定することを忘れないでください。 TCPキープアライブ、AMQPハートビート、あるいはさらに悪いことにセロリ自身のハートビートに依存することが最もよくあります。

+0

カール、あなたとあなたの家族全員に祝福を信じるような何らかの身分があるかもしれません。それがトリックでした。どうもありがとうございました。私はゴシップとミキシングが何をするのかを読み上げるつもりです。 –

関連する問題