2017-03-16 13 views
0

セロリの作業者の側でジュピターカーネルを使いたいです。各セロリワーカーにはJupyter Kernelが1つあります。タスク内のセロリのワーカーインスタンスへのアクセス

これを達成するために、デフォルトのWorkerセロリのクラスをオーバーライドしています。作業者の初期化時にjupyterカーネルを開始しています。停止メソッドを使用すると、jupyterカーネルをシャットダウンします。

私が直面している現在の問題は、タスクが実行されている間に、そのカーネルインスタンスにタスク内でアクセスする方法です。

celeryアプリケーションのWorkerクラス定義をオーバーライドするには、app.Worker = CustomWorkerよりも良い方法はありますか?

ここにカスタムワーカーのセロリ設定があります。ここで

from __future__ import absolute_import, unicode_literals 
from celery import Celery 
from jupyter_client import MultiKernelManager 

app = Celery('proj', 
    broker='redis://', 
    backend='redis://', 
    include=['tasks']) 

app.conf.update(
    result_expires=3600 
) 

class CustomWorker(app.Worker): 
    def __init__(self, *args, **kwargs): 
     self.km = MultiKernelManager() 
     self.kernel_id = self.km.start_kernel() 
     print("Custom initializing") 
     self.kernel_client = km.get_kernel(kernel_id).client() 
     super(CustomWorker, self).__init__(*args, **kwargs) 

    def on_close(self): 
     self.km.shutdown_kernel(self.kernel_id) 
     super(CustomWorker, self).on_close() 

app.Worker = CustomWorker 

if __name__ == '__main__': 
    app.start() 

はリクエストオブジェクトにその労働者のインスタンス情報を追加tasks.py

from __future__ import absolute_import, unicode_literals 
from celery import app 

from celery import Task 
from tornado import gen 
from jupyter_client import MultiKernelManager 
from zmq.eventloop import ioloop 
from zmq.eventloop.zmqstream import ZMQStream 
ioloop.install() 

reply_futures = {} 

# This is my celery task where I pass the arbitary python code to execute on 
# some celery worker(actually to the corresponding kernel) 
@app.task 
def pythontask(code): 
    # I don't know how to get the kernel_client for current celery worker !!? 
    kernel_client = self.get_current_worker().kernel_client 
    mid = kernel_client.execute(code) 

    # defining the callback which will be executed when message arrives on 
    # zmq stream 
    def reply_callback(session, stream, msg_list): 
     idents, msg_parts = session.feed_identities(msg_list) 
     reply = session.deserialize(msg_parts) 
     parent_id = reply['parent_header'].get('msg_id') 
     reply_future = reply_futures.get(parent_id) 
     if reply_future: 
      reply_future.set_result(reply) 

    @gen.coroutine 
    def execute(kernel_client, code): 
     msg_id = kernel_client.execute(code) 
     f = reply_futures[msg_id] = Future() 
     yield f 
     raise gen.Return(msg_id) 

    # initializing the zmq streams and attaching the callback to receive message 
    # from the kernel 
    shell_stream = ZMQStream(kernel_client.shell_channel.socket) 
    iopub_stream = ZMQStream(kernel_client.iopub_channel.socket) 
    shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session)) 
    iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session)) 

    # create a IOLoop 
    loop = ioloop.IOLoop.current() 
    # listen on the streams 
    msg_id = loop.run_sync(lambda: execute(kernel_client,code)) 
    print(reply_msgs[msg_id]) 
    reply_msgs[msg_id] = [] 

    # Disable callback and automatic receiving. 
    shell_stream.on_recv_stream(None) 
    iopub_stream.on_recv_stream(None) 

答えて

0

のスケルトンで、私の問題を解決しました。そのために私はworkerクラスのメソッド_process_taskをオーバーライドしました。

def _process_task(self, req): 
    try: 
    req.kwargs['kernel_client'] = self.kernel_client 
    print("printing from _process_task {}".format(req.kwargs)) 
    req.execute_using_pool(self.pool) 
    except TaskRevokedError: 
    try: 
     self._quick_release() # Issue 877 
    except AttributeError: 
     pass 
    except Exception as exc: 
    logger.critical('Internal error: %r\n%s',exc, traceback.format_exc(), exc_info=True) 

は、ここで私は私がソロモードで労働者を起動したときに、この事はのみ動作しそうではない、それはいくつかの酸洗エラーがスローされますkernel_client

@app.task(bind=True) 
def pythontask(self,code, kernel_client=None): 

    mid = kernel_client.execute(code) 

    print("{}".format(kernel_client)) 
    print("{}".format(mid)) 

にアクセスところ、私の仕事です。とにかくソロの労働者を使用して私の要件はこのソリューションは私のために働くので

関連する問題