単一のTCP接続(メモリの制約)しか受け入れられないデバイスとのインターフェイスを試行しているため、すべてのワーカースレッドの接続を開始することは、データベース接続などのクライアント/サーバー状況。Celeryタスクでのマルチプロセッシング同時実行メカニズムの使用
私はの形式で、スレッド間でグローバルにアクセス可能であるマルチプロセッシング・マネージャーの辞書を使用して試してみました:
clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}
そして、このようなタスク:
from celery import shared_task
from .celery import manager, clients
@shared_task
def send_command(controller, commandname, args):
"""Send a command to the controller."""
# Create client connection if one does not exist.
conn = None
addr, port = controller
if controller not in clients:
conn = Client(addr, port)
conn.connect()
lock = manager.RLock()
clients[controller] = (conn, lock,)
print("New controller connection to %s:%s" % (addr, port,))
else:
conn, lock = clients[controller]
try:
f = getattr(conn, commandname) # See if connection.commandname() exists.
except Exception:
raise Exception("command: %s not known." % (commandname))
with lock:
res = f(*args)
return res
しかし、タスク次のようなシリアライゼーションエラーで失敗します。
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
シリアル化可能でない値を持つタスクを呼び出さず、タスクがシリアル化可能でない値を返そうとしないにもかかわらず、セロリはこのグローバルオブジェクトをシリアル化しようとしているように思えますか?
私には何が欠けていますか? Celeryタスクで使用されるクライアントデバイス接続をスレッドセーフでスレッド間でアクセスできるようにする方法はありますか?コード例?
これはあなたの状況ではうまくいくのかどうかはわかりませんが、プロセス間のソケット接続の共有を可能にする 'multiprocessing.reduction'について読むことを思い出しました。 [例についてはこのブログ記事を参照](http://foobarnbaz.com/2011/08/30/developing-scalable-services-with-python/)。 – antikantian
クライアントはrawソケットで動作していません。プロトコルを持つTwisted接続オブジェクトです。未処理のソケットを使用するか、またはfdからTwisted接続オブジェクトを再照会することは自明ではありません。 –
私は、既存のソケットの周りにTwistedプロトコルをラップする方法を打ち出しました。しかし、私の場合は動作しませんでした。なぜなら、セラーのコンシューマはワーカーメインプロセスの*独立した子プロセス*が必要なファイル記述子(Redis )、そしてFDを共有するためのunixパイプのもつれを設定することはあまりにも多くのハッカーです。 私の状況の問題は、デバイスがメモリに制約されていて、単に複数の接続を持つことができないということです...だから、私は単一のコンシューマとそれぞれ1つのデバイスを持つワーカーのプールを持つことにしました。よくない! –