私はTwitterのストリーミングAPIへの接続を作成するタスクを(サブクラス化してcelery.task.Taskによって)作成しています。 Twitter API呼び出しの場合、私はtweepyを使用しています。セロリのドキュメンテーションから読んだように、「タスクはすべての要求に対してインスタンス化されるのではなく、グローバルインスタンスとしてタスクレジストリに登録されます。私は、タスクのapply_async(または遅延)を呼び出すたびに、もともとインスタンス化されたが、それは起こらないタスクにアクセスすることを期待していました。代わりに、カスタム・タスク・クラスの新しいインスタンスが作成されます。元のカスタムタスクにアクセスできるようにする必要があります。これは、tweepy API呼び出しによって作成された元の接続を終了できる唯一の方法であるためです。セロリはタスクのいくつかのインスタンスを作成します
from celery import registry
from celery.task import Task
class FollowAllTwitterIDs(Task):
def __init__(self):
# requirements for creation of the customstream
# goes here. The CustomStream class is a subclass
# of tweepy.streaming.Stream class
self._customstream = CustomStream(*args, **kwargs)
@property
def customstream(self):
if self._customstream:
# terminate existing connection to Twitter
self._customstream.running = False
self._customstream = CustomStream(*args, **kwargs)
def run(self):
self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()
self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]
とDjangoビューの
def connect_to_twitter(request):
if request.method == 'POST':
do_stuff_here()
.
.
.
follow_all_twitterids.apply_async(args=[], kwargs={})
return
を任意の助けをいただければ幸いです:これは役立つだろう場合はここで
は、コードのいくつかの作品です。 :D
EDIT:質問のための追加のコンテキストについて
フィルタ()メソッドが呼び出されるたびに、CustomStreamオブジェクトはhttplib.HTTPSConnectionインスタンスを作成します。この接続は、別の接続を作成しようとするたびに閉じる必要があります。 customstream.runningをFalseに設定すると、接続は閉じられます。
感謝。私は上記を実装しようとし、ちょうどcelery.utils.cached_propertyが使用されているのだろうか? – Christian
それは間違ってそこに追加されたばかりです:私はそれを削除します – asksol