2013-05-04 9 views
9

私はセロリを使用して、サーバー群のタスクをスケジュールして実行しようとしています。各タスクは多少時間がかかります(数時間)。サブプロセスを使用して、指定された入力を持つ特定のプログラムを呼び出す必要があります。このプログラムはstdoutとstderrの両方で多くの出力を生成します。セロリのストリーム結果

プログラムによって生成された出力をほぼリアルタイムでクライアントに表示する方法はありますか?出力をストリームすると、クライアントはサーバーにログインせずにサーバー上で実行されているタスクによって生成された出力を監視できますか?

+0

他のプログラムの実行中にセロリの仕事は何ですか?ちょうど 'subprocess.call'を使用していますか? –

+0

ただサブプロセスコール。 –

答えて

11

多くの要件と制約を指定していませんでした。あなたはすでにどこかにredisインスタンスがあると仮定します。

何ができることはラインによって、他のプロセスラインからの出力を読み、Redisのを使用して公開されています

import redis 
redis_instance = redis.Redis() 
p = subprocess.Popen(shlex.split("tail -f /tmp/foo"), stdout=subprocess.PIPE) 

while True: 
    line = p.stdout.readline() 
    if line: 
     redis_instance.publish('process log', line) 
    else: 
     break 
:ここ

は、テスト用のファイル/tmp/fooにあなたがechoデータをすることができます例です

import redis 

redis_instance = redis.Redis() 
pubsub = redis_instance.pubsub() 
pubsub.subscribe('process log') 

while True: 
    for message in pubsub.listen(): 
     print message # or use websockets to comunicate with a browser 

あなたが最後に処理をしたい場合は、することができます。例:別のプロセスで

セロリのタスクが完了した後に「終了」を送信します。

異なるチャネル(文字列はsubscribe)を使用して、異なるプロセスからの出力を分離することができます。

また、あなたがしたい場合は、Redisの中

redis_instance.rpush('process log', message) 

をお使いのログ出力を保存し、後で完全な形でそれを取得することができます。

5

私はそれを行う方法を見る一つの方法は、(docsを参照標準エラー出力と標準出力のために使用されるカスタムロガーを作成することです:

​​

あなたのロガーは、データベースへのMemcachedのデータを保存することができ、 Redisのか、何でもあなたがデータを取得するために使用します共有ストレージを

私はloggerの構造についてはよく分からないが、私はこのような何かがうまくいくと思います:。

from logging import Logger 

class MyLogger(Logger): 
    def log(lvl, msg): 
     # Do something with the message 
+0

@anand_trexこれを試しましたか? – AJP

+0

私は、ロギングと結果のストリーミングを混合するのがとても快適ではありません。これは一種のハックなので、私はこの解決法を使用しません。 –

2

これは古い質問ですが、まだこの特定のトピックについての唯一の結果です。

は、ここで私はRedisの

class RedisFileObject(object): 
    def __init__(self, _key): 
     self.connection = redis.Redis() 
     self.key = _key 
     self.connection.publish('debug', 'Created channel %s' % self.key) 

    def write(self, data): 
     self.connection.publish(self.key, data) 

    def close(self): 
     pass 

私は私のすべてのタスクは、様々な機能税込を継承するBaseTaskを持っている上で、特定のチャンネルに公開する単純なファイルのようなオブジェクトを作成し 、私はそれについていった方法です。これはstdoutとstderrをRedisファイルのようなオブジェクトに置き換えます。

標準出力/標準エラー出力に書き込まれたものでそこから
def capture_output(self): 
    sys.stdout = RedisFileObject(self.request.id) 
    sys.stderr = RedisFileObject(self.request.id) 

は、タスクidの名にちなんで名付けられRedisのチャネルに転送されます。

関連する問題