私はRick BransonのPyConビデオを見てきました:Messaging at Scale at Instagram。あなたはこの質問に答えるためにビデオを見たいかもしれません。 Rick BransonはCelery、Redis、RabbitMQを使用しています。あなたにスピードをあげるために、各ユーザーは自分の家庭用の赤字リストを持っています。各リストには、彼らが従う人々によって投稿された写真のメディアIDが含まれています。Django、Celery、Redis、RabbitMQ:ファンアウトオン書き込みの連鎖タスク
たとえば、Justin Bieberには150万人のフォロワーがいます。彼が写真を投稿するとき、その写真のIDは、彼の信者のそれぞれのための個々の赤字リストに挿入する必要があります。これは、ファンアウト・オン・ライト・アプローチと呼ばれます。しかし、このアプローチにはいくつかの信頼性の問題があります。それはうまくいくが、Justin BieberやLady Gagaのような何百万人ものフォロワーを抱えている人には、ウェブリクエスト(リクエストを完了するために0〜500msのところ)でこれを行うことが問題になる可能性がある。それまでに、要求はタイムアウトになります。
だから、Rick Bransonは分散メッセージパッシングに基づいた非同期タスクキュー/ジョブキューであるCeleryを使用することに決めました。フォロワーのリストにメディアIDを挿入するなどの重度の持ち上げは、Web要求の外で非同期で行うことができます。リクエストは完了し、セロリはIDをすべてのリストに挿入し続けます。
この方法では不思議なことになります。しかし、再び、ジャスティンの追随者のすべてをセロリに1つの大きな塊で届けたいとは思わない。なぜならセロリの労働者を縛るからだ。同時に複数の作業者が同時に作業して、作業が速くなるのはなぜですか?素晴らしいアイデア!このチャンクを小さなチャンクに分割し、それぞれのバッチで作業する別々の従業員にしたいと考えています。 Rick Bransonは1万人のフォロワーを務め、ジャスティン・ビーバーのフォロワーすべてのメディアIDを挿入し続けるまで、カーソルと呼ばれるものを使用します。ビデオでは、彼はこれについて3時56分に語ります。
誰もがこのことをもっと説明し、それがどのようにできるかの例を示しているのだろうかと思っていました。私は現在、同じセットアップを試みています。私はRedisサーバーと通信するためにAndy McCurdyのredis-py pythonクライアントライブラリを使用します。私のサービス上のすべてのユーザーに対して、私はredis followersリストを作成します。
だから、343のIDを持つユーザーは、次のキーのリストを持っているでしょう:
followers:343
を私はまた、各ユーザのhomefeedリストを作成します。すべてのユーザーが独自のリストを持っています。で
homefeed:1990
「フォロワー:343」Redisのリストは、それがユーザー343ユーザー343に続く人々のすべてのIDが含まれてい だから1990のIDを持つユーザーは、次のキーのリストを持っているでしょうフォロワー20,007人いる以下では、リスト内のすべてのIDをインデックス0から末尾-1まで検索しています。ちょうどそのように見えます。
>>> r_server.lrange("followers:343", 0, -1)
['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs.
何あなたが見ることはここでは、ユーザ343
に続くすべてのユーザIDのリストは、私のinsert_into_homefeed機能が含まれている私のがproj/mydjangoapp/tasks.pyです:
from __future__ import absolute_import
from celery import shared_task
import redis
pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX')
@shared_task
def insert_into_homefeed(photo_id, user_id):
# Grab the list of all follower IDs from Redis for user_id.
r_server = redis.Redis(connection_pool=pool)
followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1)
# Now for each follower_id in followers_list, find their homefeed key
# in Redis and insert the photo_id into that homefeed list.
for follower_id in followers_list:
homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)
return "Fan Out Completed for %s" % (user_id)
このタスクでは、Djangoビューから呼び出されると、ユーザー343に従うユーザーのすべてのIDを取得し、その写真フィードIDをすべてのホームフィードリストに挿入します。
私のproj/mydjangoapp/views.pyのアップロードビューです。私は基本的にセロリの遅延メソッドを呼び出し、要求がすぐに終了するようにneccessary変数を渡す:
# Import the Celery Task Here
from mydjangoapp.tasks import insert_into_homefeed
@csrf_exempt
def Upload(request):
if request.method == 'POST':
data = json.loads(request.body)
newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url'])
newPhoto_ID = newPhoto.pk
insert_into_homefeed.delay(newPhoto_ID, data['user_id'])
return HttpResponse("Request Completed")
どのように私はそれが万でバッチ処理されるような方法でこれを行うことができますか?
お返事ありがとうございます! :)素敵なアプローチ!しかし、これは無限ループではないでしょうか?リスト全体を実行した後も、タスクが何度も繰り返し呼び出されることはありませんか? – noahandthewhale
ああ!私はちょうどfollowers_list_batchを見ました: – noahandthewhale
あなたはそれを持っています。それは、明示的なreturn文を使用していたはずです。 –