2017-10-23 20 views
2

私はPythonを使用してスレッドアプリケーションを実装しました。実行時に私はCTRL + Cのsigcallを捕まえてプログラムを終了したい。これを行うために、exit_gracefullyという関数を登録しました。この関数は、スレッドをより制御された方法で停止させます。しかし、それは動作していないようです。あなたが最初のいくつかのテスト・データを生成してもよい上記の例を試してみたい場合はキューからアイテムを引き出すスレッドを中止する方法PythonでCtrl + Cを使用する

import Queue 
import threading 
import signal 
import sys 
import time 

queue = Queue.Queue() 
workers = list() 

def callback(id, item): 
    print("{}: {}".format(id, item)) 
    time.sleep(1) 

def exit_gracefully(signum, frame): 
    print("Ctrl+C was pressed. Shutting threads down ...") 
    print("Stopping workers ...") 
    for worker in workers: 
     worker.stop() 
    sys.exit(1) 


class ThreadedTask(threading.Thread): 
    def __init__(self, id, queue, callbacks): 
     threading.Thread.__init__(self) 
     self._stop_event = threading.Event() 
     self.id = str(id) 
     self.queue = queue 
     self.callbacks = callbacks 
     self._stopped = False 

    def run(self): 
     while not self.stopped(): 
      item = self.queue.get() 
      for callback in self.callbacks: 
       callback(self.id, item) 
      self.queue.task_done() 

    def stop(self): 
     self._stop_event.set() 
     self._stopped = True 

    def stopped(self): 
     return self._stop_event.is_set() or self._stopped 


def main(input_file, thread_count, callbacks): 
    print("Initializing queue ...") 
    queue = Queue.Queue() 

    print("Parsing '{}' ...".format(input_file)) 
    with open(input_file) as f: 
     for line in f: 
      queue.put(line.replace("\n", "")) 

    print("Initializing {} threads ...".format(thread_count)) 
    for id in range(thread_count): 
     worker = ThreadedTask(id, queue, callbacks) 
     worker.setDaemon(True) 
     workers.append(worker) 

    print("Starting {} threads ...".format(thread_count)) 
    for worker in workers: 
     worker.start() 

    queue.join() 


if __name__ == '__main__': 
    signal.signal(signal.SIGINT, exit_gracefully) 
    print("Starting main ...") 
    input_file = "list.txt" 
    thread_count = 10 
    callbacks = [ 
     callback 
    ] 
    main(input_file, thread_count, callbacks) 

seq 1 10000 > list.txt 
ここに私が働いている例だハンドラが

呼び出されることはありませんようです

何か助けていただければ幸いです!

+0

なぜKeyboardInterruptで 'pass'するのですか?私は私の終了コードのKeyboardInterruptを使用します。 – Marichyasana

+0

@Marichyasana:Thx、try-catch-blockはここでは必要ないと思います。削除しましたが、Ctrl + Cはまだ動作しません。 – rednammoc

答えて

3

ここではうまくいくソリューションです。

timeoutが設定されていない場合、Queue.get()SIGINTを無視します。それはここに書かれています:https://bugs.python.org/issue1360

もう1つの問題は、Queue.join()も無視するようです。SIGINT。私はそれが空であるかどうかを見るためにループ内のキューをポーリングすることによって取り組みました。

これらの問題は、Python 3に

を固定されているように見える私はまた、シャットダウンするすべてのスレッドを伝えるためにSIGINTハンドラ内で使われている共有イベントを追加しました。

import Queue 
import signal 
import sys 
import threading 
import time 


def callback(id, item): 
    print '{}: {}'.format(id, item) 
    time.sleep(1) 


class ThreadedTask(threading.Thread): 

    def __init__(self, id, queue, run_event, callbacks): 
     super(ThreadedTask, self).__init__() 
     self.id = id 
     self.queue = queue 
     self.run_event = run_event 
     self.callbacks = callbacks 

    def run(self): 
     queue = self.queue 
     while not self.run_event.is_set(): 
      try: 
       item = queue.get(timeout=0.1) 
      except Queue.Empty: 
       pass 
      else: 
       for callback in self.callbacks: 
        callback(self.id, item) 
       queue.task_done() 


def main(): 
    queue = Queue.Queue() 
    run_event = threading.Event() 
    workers = [] 

    def stop(): 
     run_event.set() 
     for worker in workers: 
      # Allow worker threads to shut down completely 
      worker.join() 

    def sigint_handler(signum, frame): 
     print '\nShutting down...' 
     stop() 
     sys.exit(0) 

    signal.signal(signal.SIGINT, sigint_handler) 

    callbacks = [callback] 

    for id in range(1, 11): 
     worker = ThreadedTask(id, queue, run_event, callbacks) 
     workers.append(worker) 

    for worker in workers: 
     worker.start() 

    with open('list.txt') as fp: 
     for line in fp: 
      line = line.strip() 
      queue.put(line) 

    while not queue.empty(): 
     time.sleep(0.1) 

    # Update: Added this to gracefully shut down threads after all 
    # items are consumed from the queue. 
    stop() 


if __name__ == '__main__': 
    main() 
+0

Thx!魅力的な作品! – rednammoc

+1

私は別のエラーを認識しました。キューが空の場合、プロセスは停止しません。また、Ctrl + Cはここでは機能しません。 – rednammoc

+0

Queue.Empty例外の後にbreak文を追加すると、これが解決されます。どうも! – rednammoc

関連する問題