2014-01-08 26 views
9

私はRick BransonのPyConビデオを見てきました:Messaging at Scale at Instagram。あなたはこの質問に答えるためにビデオを見たいかもしれません。 Rick BransonはCelery、Redis、RabbitMQを使用しています。あなたにスピードをあげるために、各ユーザーは自分の家庭用の赤字リストを持っています。各リストには、彼らが従う人々によって投稿された写真のメディアIDが含まれています。Django、Celery、Redis、RabbitMQ:ファンアウトオン書き込みの連鎖タスク

たとえば、Justin Bieberには150万人のフォロワーがいます。彼が写真を投稿するとき、その写真のIDは、彼の信者のそれぞれのための個々の赤字リストに挿入する必要があります。これは、ファンアウト・オン・ライト・アプローチと呼ばれます。しかし、このアプローチにはいくつかの信頼性の問題があります。それはうまくいくが、Justin BieberやLady Gagaのような何百万人ものフォロワーを抱えている人には、ウェブリクエスト(リクエストを完了するために0〜500msのところ)でこれを行うことが問題になる可能性がある。それまでに、要求はタイムアウトになります。

だから、Rick Bransonは分散メッセージパッシングに基づいた非同期タスクキュー/ジョブキューであるCeleryを使用することに決めました。フォロワーのリストにメディアIDを挿入するなどの重度の持ち上げは、Web要求の外で非同期で行うことができます。リクエストは完了し、セロリはIDをすべてのリストに挿入し続けます。

この方法では不思議なことになります。しかし、再び、ジャスティンの追随者のすべてをセロリに1つの大きな塊で届けたいとは思わない。なぜならセロリの労働者を縛るからだ。同時に複数の作業者が同時に作業して、作業が速くなるのはなぜですか?素晴らしいアイデア!このチャンクを小さなチャンクに分割し、それぞれのバッチで作業する別々の従業員にしたいと考えています。 Rick Bransonは1万人のフォロワーを務め、ジャスティン・ビーバーのフォロワーすべてのメディアIDを挿入し続けるまで、カーソルと呼ばれるものを使用します。ビデオでは、彼はこれについて3時56分に語ります。

誰もがこのことをもっと説明し、それがどのようにできるかの例を示しているのだろうかと思っていました。私は現在、同じセットアップを試みています。私はRedisサーバーと通信するためにAndy McCurdyのredis-py pythonクライアントライブラリを使用します。私のサービス上のすべてのユーザーに対して、私はredis followersリストを作成します。

だから、343のIDを持つユーザーは、次のキーのリストを持っているでしょう:

followers:343 

を私はまた、各ユーザのhomefeedリストを作成します。すべてのユーザーが独自のリストを持っています。で

homefeed:1990 

「フォロワー:343」Redisのリストは、それがユーザー343ユーザー343に続く人々のすべてのIDが含まれてい だから1990のIDを持つユーザーは、次のキーのリストを持っているでしょうフォロワー20,007人いる以下では、リスト内のすべてのIDをインデックス0から末尾-1まで検索しています。ちょうどそのように見えます。

>>> r_server.lrange("followers:343", 0, -1) 
['8', '7', '5', '3', '65', '342', '42', etc...] ---> for the sake of example, assume this list has another 20,000 IDs. 

何あなたが見ることはここでは、ユーザ343

に続くすべてのユーザIDのリストは、私のinsert_into_homefeed機能が含まれている私のがproj/mydjangoapp/tasks.pyです:

from __future__ import absolute_import 
from celery import shared_task 
import redis 
pool = redis.ConnectionPool(host='XX.XXX.XXX.X', port=6379, db=0, password='XXXXX') 

@shared_task 
def insert_into_homefeed(photo_id, user_id): 
    # Grab the list of all follower IDs from Redis for user_id. 
    r_server = redis.Redis(connection_pool=pool) 

    followers_list = r_server.lrange("followers:%s" % (user_id), 0, -1) 

    # Now for each follower_id in followers_list, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list. 

    for follower_id in followers_list: 
     homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id) 
    return "Fan Out Completed for %s" % (user_id) 

このタスクでは、Djangoビューから呼び出されると、ユーザー343に従うユーザーのすべてのIDを取得し、その写真フィードIDをすべてのホームフィードリストに挿入します。

私のproj/mydjangoapp/views.pyのアップロードビューです。私は基本的にセロリの遅延メソッドを呼び出し、要求がすぐに終了するようにneccessary変数を渡す:

# Import the Celery Task Here 
from mydjangoapp.tasks import insert_into_homefeed 


@csrf_exempt 
def Upload(request): 
    if request.method == 'POST': 
     data = json.loads(request.body) 
     newPhoto = Photo.objects.create(user_id = data['user_id'], description= data['description'], photo_url = data['photo_url']) 
     newPhoto_ID = newPhoto.pk 
     insert_into_homefeed.delay(newPhoto_ID, data['user_id']) 
     return HttpResponse("Request Completed") 

どのように私はそれが万でバッチ処理されるような方法でこれを行うことができますか?

答えて

8

ビデオで説明されているアプローチは、タスク「連鎖」です。

タスクメソッドを起動してチェーンとして実行するには、フォロワーのリストにインデックスを表す追加のパラメーターを追加する必要があります。フォロワーの完全なリストで作業するのではなく、タスクは渡された索引引数から始まる固定されたバッチ・サイズでのみ機能します。タスクが完了すると、新しいタスクが作成され、新しいインデックスが渡されます。

INSERT_INTO_HOMEFEED_BATCH = 10000 

@shared_task 
def insert_into_homefeed(photo_id, user_id, index=0): 
    # Grab the list of all follower IDs from Redis for user_id. 
    r_server = redis.Redis(connection_pool=pool) 

    range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index 

    followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit) 

    if not followers_list_batch: 
     return # zero followers or no more batches 

    # Now for each follower_id in followers_list_batch, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list. 
    for follower_id in followers_list: 
     homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id) 

    insert_into_homefeed.delay(photo_id, user_id, range_limit + 1) 

これはRedisのlists are orderedとLRANGEコマンドdoesn't return an error on out-of-range inputsので、うまく動作します。

+0

お返事ありがとうございます! :)素敵なアプローチ!しかし、これは無限ループではないでしょうか?リスト全体を実行した後も、タスクが何度も繰り返し呼び出されることはありませんか? – noahandthewhale

+0

ああ!私はちょうどfollowers_list_batchを見ました: – noahandthewhale

+0

あなたはそれを持っています。それは、明示的なreturn文を使用していたはずです。 –

関連する問題