2017-06-21 15 views
0

私はPython3、Flask 0.12、Pytest 3.0.7を使用しています。フラスコサーバーが別のスレッドを実行すると、Pytestがハングアップする

私はこれに似たフラスコアプリがあります。私は次のようにpytestを使用して、このアプリをテストしようとしています

class AppInitializer: 
    def __init__(self): 
     pass 

    @staticmethod 
    def __function_to_be_refreshed(): 
     while True: 
      try: 
       time.sleep(5) 
      except Exception as e: 
       logger.exception(e) 


    def create_app(self): 
     daemon_thread = threading.Thread(target=self.__function_to_be_refreshed) 
     daemon_thread.daemon = True 
     daemon_thread.start() 
     atexit.register(daemon_thread.join) 
     app_ = Flask(__name__) 
     return app_ 


app_initializer = AppInitializer() 
app = app_initializer.create_app() 

を:

import unittest 

import pytest 


class TestCheckPriceRequestAPI(unittest.TestCase): 
    def setUp(self): 
     self.app = api.app.test_client() 

    def test_random(self): 
     pass 

私はpytestを使用してこのテストを実行すると、このテストを(他のすべてのテストとともに)正常に実行されますが、pytestはハングします。実行中のpytestプロセスを停止する方法(またはデーモンスレッドを終了させる方法)

+0

まあフラスコアプリには、ビジネスの産卵スレッド – e4c5

+0

e4c5 @を持っていない:それは、プロジェクトの追加要件でした。 – tusharmakkar08

答えて

2

joinコマンドは、スレッドが完了するまで待機するが、終了しないことを意味します。無限ループでスレッドを終了するには、次のように行うことができます:

class AppInitializer: 
    def __init__(self): 
     self._run = True 
     self._daemon_thread = None 

    def __function_to_be_refreshed(self): 
     while self._run: 
      try: 
       time.sleep(5) 
      except Exception as e: 
       logger.exception(e) 

    def __shutdown(self): 
     self._run = False 
     if self._daemon_thread: 
      self._daemon_thread.join() 

    def create_app(self): 
     self._daemon_thread = threading.Thread(target=self.__function_to_be_refreshed) 
     ... 
     atexit.register(self.__shutdown) 
     ... 
+0

スレッドの終了と終了の間に混乱しました。提案ありがとう:) – tusharmakkar08

関連する問題