2011-07-26 4 views
3

私は、Pythonバインディングを使用してzeroMQでパイプラインパターンを実装しています。ワーカープロセスが常にzeroMQで終了することを確認する

タスクは、このような無限ループを使用して新しいタスクをリッスン労働者にファンアウトされています

while True: 
     socks = dict(self.poller.poll()) 
     if self.receiver in socks and socks[self.receiver] == zmq.POLLIN: 
      msg = self.receiver.recv_unicode(encoding='utf-8') 
      self.process(msg) 
     if self.hear in socks and socks[self.hear] == zmq.POLLIN: 
      msg = self.hear.recv() 
      print self.pid,":", msg 
      sys.exit(0) 

彼らはシンクノードからメッセージを取得するときに、彼らが期待されるすべての結果を受け取った確認し、終了します。

ただし、このようなメッセージが表示されず、終了しないことがあります。すでに述べたメッセージ以外に、処理すべき他のタスクがないことを知る方法がないときに、労働者が常に終了するようにする最良の方法は何ですか?ここで

は、私は労働者の状態を確認するために書いたテストコードです:

#-*- coding:utf-8 -*- 
""" 
Test module containing tests for all modules of pypln 

""" 
import unittest 
from servers.ventilator import Ventilator 
from subprocess import Popen, PIPE 
import time 
class testWorkerModules(unittest.TestCase): 
    def setUp(self): 
     self.nw = 4 
     #spawn 4 workers 
     self.ws = [Popen(['python', 'workers/dummy_worker.py'], stdout=None) for i in range(self.nw)] 
     #spawn a sink 
     self.sink = Popen(['python', 'sinks/dummy_sink.py'], stdout=None) 
     #start a ventilator 
     self.V = Ventilator() 
     # wait for workers and sinks to connect 
     time.sleep(1) 

    def test_send_unicode(self): 
     ''' 
     Pushing unicode strings through workers to sinks. 
     ''' 

     self.V.push_load([u'são joão' for i in xrange(80)]) 
     time.sleep(1) 
     #[p.wait() for p in self.ws]#wait for the workers to terminate 
     wsr = [p.poll() for p in self.ws] 
     while None in wsr: 
      print wsr, [p.pid for p in self.ws if p.poll() == None] #these are the unfinished workers 
      time.sleep(0.5) 
      wsr = [p.poll() for p in self.ws] 
     self.sink.wait() 
     self.sink = self.sink.returncode 
     self.assertEqual([0]*self.nw, wsr) 
     self.assertEqual(0, self.sink) 

if __name__ == '__main__': 
    unittest.main() 
+0

また、このテストの厄介な副作用は、それが終了しないと、それがゾンビ後ろのプロセス(従業員またはシンク) – fccoelho

答えて

1

すべてのメッセージングのものは、最終的には、ハートビートで終わります。あなたが作業する必要があるコンポーネントが死んでいることをワーカーやシンクなどが発見した場合、基本的にどこかに接続しようとするか、自分を殺すことができます。だから、あなたが労働者としてシンクがなくなったことを発見したら、ただ終了してください。これは、シンクがまだ存在していても接続が切断されていても終了することを意味します。しかし、私はあなたがより多くを行うことができるとは思わない、おそらくすべてのタイムアウトをより合理的に設定する...

関連する問題