2017-01-19 17 views
2

セロリの作業員にテンソルフローを使用しようとしています。私は労働者からの応答を受け取るのではなく、タイムアウトに遭遇しました。セロリの労働者からTensorFlowの応答がありません

私は、次のコードを使用:私は、このコマンドを使用して、労働者を実行

tasks.py

from celery import Celery 
from celery.signals import worker_init 

import tensorflow as tf 

app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0') 

class TFModel(): 
    def __init__(self): 
    self.sess = tf.Session() 
    def run(self): 
    return self.sess.run(tf.constant('hello')) 

tf_model = None 

@worker_init.connect 
def on_worker_init(**_): 
    global tf_model 
    tf_model = TFModel() 
    print(tf_model.run()) 
    return 

@app.task(time_limit=10) 
def run(): 
    return tf_model.run() 

test.py

import time 
from tasks import run 

r=run.delay() 
while not r.ready(): 
    time.sleep(2) 

print(r.get()) 

を。

$ celery -A tasks worker -l info -c 1

私は労働者を実行するとon_worker_init()print(tf_model.run())を持っていたことから、helloは、プリントアウトしました。 これは、テンソルの流れが適切に機能することを意味します。

はその後、私は走った:

$ python test.py

を次に、私が得た:

celery.backends.base.TimeLimitExceeded: TimeLimitExceeded(10,)

を間違っていましたか? 何が起こったのかを調査するにはどうすればよいですか?

私の環境は次のとおりです。

python 3.5.1 
tensorflow 0.11.0 
celery 4.0.2 

感謝。

答えて

2

これを試してみてください:

import tensorflow as tf 
from celery import Celery 
from celery.utils.log import get_task_logger 
from celery.signals import worker_init, worker_process_init 
from models import Network, Extractor 
from celery.concurrency import asynpool 
asynpool.PROC_ALIVE_TIMEOUT = 100.0 #set this long enough 

logger = get_task_logger(__name__) 

CELERY_BROKER_URL = 'redis://localhost:6379/' 
CELERY_RESULT_BACKEND = 'redis://localhost:6379/' 

# Celery: Distributed Task Queue 
app = Celery('tasks', backend=CELERY_RESULT_BACKEND, broker=CELERY_BROKER_URL) 
app.conf.task_serializer = 'json' 
app.conf.result_serializer = 'json' 

tf_model = None 

@worker_process_init.connect() 
def on_worker_init(**_): 
    global tf_model 
    # Create server with model 
    logger.info('model for worker: started init') 
    print("model for dsa") 
    session = tf.Session() 
    model = Network(session, True) 
    #model.load_model('./models/test_2') 
    extractor = Extractor(model) 
    tf_model = extractor 
    logger.info('model for worker: initialized') 


@app.task(name='process_single') 
def process_single(image): 
    logger.info('process_single: started') 
    descriptor = tf_model.process_single(image) 
    logger.info('process_single: completed') 

    return descriptor 

それは、これが動作していることを私のためになります。

[2017-01-21 09:41:18,892: INFO/Worker-1] ???[???]: model for worker: started init 
[2017-01-21 09:41:18,893: WARNING/Worker-1] model for dsa 
[2017-01-21 09:41:18,902: INFO/MainProcess] Connected to redis://localhost:6379// 
[2017-01-21 09:41:18,915: INFO/MainProcess] mingle: searching for neighbors 
[2017-01-21 09:41:19,920: INFO/MainProcess] mingle: all alone 
[2017-01-21 09:41:19,949: WARNING/MainProcess] [email protected] ready. 
[2017-01-21 09:41:20,930: INFO/Worker-1] ???[???]: model for worker: initialized 
[2017-01-21 09:41:31,648: INFO/MainProcess] Received task: process_single[024068ba-9ea2-4405-8aab-d3504a06aa55] 
[2017-01-21 09:41:31,658: INFO/Worker-1] process_single[024068ba-9ea2-4405-8aab-d3504a06aa55]: process_single: started 
[2017-01-21 09:41:33,125: INFO/Worker-1] process_single[024068ba-9ea2-4405-8aab-d3504a06aa55]: process_single: completed 
[2017-01-21 09:41:33,128: INFO/MainProcess] Task process_single[024068ba-9ea2-4405-8aab-d3504a06aa55] succeeded in 1.470330449s: [153608.4375, 0.0, 0.0, 243285.75, 0.0, 155679.671875, 346120.625, 70663.265625, 0.0, 29445.03125, 0.0, 518396.25, 0.0,... 
+0

は、あなたの答えをありがとう!あなたはケラを使っていますか? sess.run()がセロリで動作しないのはなぜかと思います。 – Kumon

+0

私はテンソルフローを直接使用していますので、ケラスを使わず、コンストラクタでネットワークの初期化を行います。 tf_model.process_singleでsession.runを問題なく使用する。 – Cospel

+0

@ worker_process_init.connect()を使用していて、@worker_initを使用していないことに注意してください。また、モデルが正しくロードされているかどうかを確認できるように、印刷/ロギングを試してみてください。 – Cospel

関連する問題