2011-06-26 15 views
9

私はマルチスレッド設計(選択肢はありません)を使用していますが、私のコードの大部分は単一のスレッドにあり、その中のすべてのイベントはqueueで管理されています。このようにして、私のコードのほとんどはシングルスレッドであるかのように動作し、ロック、セマフォ、その他のことについて心配する必要はありません。Pythonマルチスレッドunittesting

私はコードをunittestする必要があります(最初はTDDingをしないでください)、私は迷っています。別のスレッドで何かをテストしますか? ?例えば

、私は次のクラス持っていると言う:

class MyClass(): 
    def __init__(self): 
     self.a=0 
     # register event to self.on_event 

    def on_some_event(self, b): 
     self.a += b 

    def get(self): 
     return self.a 

をし、私がテストしたい:

import unittest 
from queued_thread import ThreadedQueueHandler 

class TestMyClass(unittest.TestCase): 
    def setUp(self): 
     # create the queued thread and assign the queue to self.queue 

    def test_MyClass(self): 
     mc = MyClass() 
     self.queue.put({'event_name':'some_event', 'val':1}) 
     self.queue.put({'event_name':'some_event', 'val':2}) 
     self.queue.put({'event_name':'some_event', 'val':3}) 
     self.assertEqual(mc.get(),6) 

if __name__ == '__main__': 
    unittest.main() 

MyClass.get()キューに入れられたスレッド内の何のため正常に動作しますが、それは非同期的に呼び出されますテストによってメインスレッドでは、結果が正しくない可能性があります!

答えて

6

あなたの設計がすべてがキューを通過しなければならないと仮定したら、それを戦わないでください - すべてを通り抜けてください!

あなたのキューに入れられたイベントハンドラにon_callイベントを追加し、そこに次の関数を登録します。

def on_call(self, callback): 
    callback() 

、その後にテストを変更します。

def test_MyClass(self): 
    def threaded_test(): 
     self.assertEqual(mc.get(),6) 

    mc = MyClass() 
    self.queue.put(1) 
    self.queue.put(2) 
    self.queue.put(3) 
    self.queue.put({'event_name':'call','val':threaded_test}) 
2

あなたがでtest_threading.pyで見ることができあなたがしようとしているものに似た何かをするstdlibテスト。基本的な考え方は、テスト条件がアサートされる前に実行が完了するように、スレッド実行をミューテックスおよびセマフォで保護することです。

+0

これはチュートリアル\ docを知っていますか? – Jonathan

+0

私は自分自身にリンクしたコードを実行する方が簡単ですし、python.orgのドキュメントにあるドキュメントのスレッディングに関する助けを借りるかもしれません。 –