2016-03-24 12 views
1

単一のTCP接続(メモリの制約)しか受け入れられないデバイスとのインターフェイスを試行しているため、すべてのワーカースレッドの接続を開始することは、データベース接続などのクライアント/サーバー状況。Celeryタスクでのマルチプロセッシング同時実行メカニズムの使用

私はの形式で、スレッド間でグローバルにアクセス可能であるマルチプロセッシング・マネージャーの辞書を使用して試してみました:

clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}

そして、このようなタスク:

from celery import shared_task 
from .celery import manager, clients 

@shared_task 
def send_command(controller, commandname, args): 
    """Send a command to the controller.""" 
    # Create client connection if one does not exist. 
    conn = None 
    addr, port = controller 
    if controller not in clients: 
     conn = Client(addr, port) 
     conn.connect() 
     lock = manager.RLock() 
     clients[controller] = (conn, lock,) 
     print("New controller connection to %s:%s" % (addr, port,)) 
    else: 
     conn, lock = clients[controller] 

    try: 
     f = getattr(conn, commandname) # See if connection.commandname() exists. 
    except Exception: 
     raise Exception("command: %s not known." % (commandname)) 

    with lock: 
     res = f(*args) 
     return res 

しかし、タスク次のようなシリアライゼーションエラーで失敗します。

_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

シリアル化可能でない値を持つタスクを呼び出さず、タスクがシリアル化可能でない値を返そうとしないにもかかわらず、セロリはこのグローバルオブジェクトをシリアル化しようとしているように思えますか?

私には何が欠けていますか? Celeryタスクで使用されるクライアントデバイス接続をスレッドセーフでスレッド間でアクセスできるようにする方法はありますか?コード例?

+0

これはあなたの状況ではうまくいくのかどうかはわかりませんが、プロセス間のソケット接続の共有を可能にする 'multiprocessing.reduction'について読むことを思い出しました。 [例についてはこのブログ記事を参照](http://foobarnbaz.com/2011/08/30/developing-scalable-services-with-python/)。 – antikantian

+0

クライアントはrawソケットで動作していません。プロトコルを持つTwisted接続オブジェクトです。未処理のソケットを使用するか、またはfdからTwisted接続オブジェクトを再照会することは自明ではありません。 –

+0

私は、既存のソケットの周りにTwistedプロトコルをラップする方法を打ち出しました。しかし、私の場合は動作しませんでした。なぜなら、セラーのコンシューマはワーカーメインプロセスの*独立した子プロセス*が必要なファイル記述子(Redis )、そしてFDを共有するためのunixパイプのもつれを設定することはあまりにも多くのハッカーです。 私の状況の問題は、デバイスがメモリに制約されていて、単に複数の接続を持つことができないということです...だから、私は単一のコンシューマとそれぞれ1つのデバイスを持つワーカーのプールを持つことにしました。よくない! –

答えて

0
... 
self._send_bytes(ForkingPickler.dumps(obj)) 
File "/usr/lib64/python3.4/multiprocessing/reduction.py", line 50, in dumps 
cls(buf, protocol).dump(obj) 
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed 

インターネットの周りを見回した後、私はおそらくトレースバックで重要なものを見逃していたことに気付きました。トレースバックを近づけた後、私はセロリが接続オブジェクトをピクルしようとするのではなく、むしろマルチプロセシング。縮小は、一方の側でシリアル化し、他方の側で再構成するために使用されます。

私は、この問題を回避するためのいくつかの代替アプローチを持っていますが、クライアント・ライブラリ接続オブジェクトを借りて使用することは当初は望んでいませんでしたが、マルチプロセッシングとプリフォークでは不可能です。

+0

ああ、私はあなたが同じ接続オブジェクトを渡したいので、私の答えは少し急いであったと思います。 mutliprocessingとpreforkは、通常、プロセス間の接続や入出力をうまく使っていません。通常、ポストフォーク接続を確立します。 preforkからeventletまたはgeventへの同時性の切り替えを検討してから、接続プールを実装しましたか? – antikantian

0

Redisを使用して分散ロックマネージャを実装する方法はありますか? Redis pythonクライアントにはロック機能が組み込まれています。また、redis.ioのthis docを参照してください。 RabbitMQや他のブローカーを使用している場合でも、Redisは非常に軽量です。

from functools import wraps 

def device_lock(block=True): 
    def decorator(func): 
     @wraps(func) 
     def wrapper(*args, **kwargs): 
      return_value = None 
      have_lock = False 
      lock = redisconn.lock('locks.device', timeout=2, sleep=0.01) 
      try: 
       have_lock = lock.acquire(blocking=block) 
       if have_lock: 
        return_value = func(*args, **kwargs) 
      finally: 
       if have_lock: 
        lock.release() 
      return return_value 
     return wrapper 
    return decorator 

@shared_task 
@device_lock 
def send_command(controller, commandname, args): 
    """Send a command to the controller.""" 
    ... 

あなたはまた、セロリのタスクの料理からthis approachを使用することができます:デコレータとして例えば

from celery import task 
from celery.utils.log import get_task_logger 
from django.core.cache import cache 
from hashlib import md5 
from djangofeeds.models import Feed 

logger = get_task_logger(__name__) 

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes 

@task(bind=True) 
def import_feed(self, feed_url): 
    # The cache key consists of the task name and the MD5 digest 
    # of the feed URL. 
    feed_url_hexdigest = md5(feed_url).hexdigest() 
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest) 

    # cache.add fails if the key already exists 
    acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE) 
    # memcache delete is very slow, but we have to use it to take 
    # advantage of using add() for atomic locking 
    release_lock = lambda: cache.delete(lock_id) 

    logger.debug('Importing feed: %s', feed_url) 
    if acquire_lock(): 
     try: 
      feed = Feed.objects.import_feed(feed_url) 
     finally: 
      release_lock() 
     return feed.url 

    logger.debug(
     'Feed %s is already being imported by another worker', feed_url) 
+0

私はこれらの解決策を知っていましたが、私が使用していない理由は、プロセス間で実際の接続オブジェクトを共有し、すでに開いている接続を使用するだけで済むためです。私は、タスクが実行されるたびに切断と再接続を回避しようとしています。 ワーカーを1つのスレッドで実行して再利用すると、接続オブジェクトをグローバルとして保持できます。私はこれらのクライアントに単一プロセスのワーカーのプールを使用することを検討しています。そうでなければ、私がメッセージを送るたびに接続することを選択すると、私はRedisを使ってロックを行います。他の解決策の中で... –

0

は、あなたの代わりにプロセスのgeventまたはeventletセロリの労働者を使用しようとしましたし、スレッド?この場合、グローバルvarまたはthreading.local()を使用して接続オブジェクトを共有できます。

+0

私はイベントレットを使ってロックアップを取得していました。なぜ私は何をしようとしているのアイデンティティをブロックするためのほとんどのインセンティブがないことを見つけるためにもっと努力することができますeventlet/geventのイベントループの性質に適していません。 –

関連する問題