私は3つのシェルスクリプトP1、P2、P3をチェーン化しようとしています。これらの3つのシェルスクリプトは連続して実行する必要がありますが、いつでも複数のP1とP2とP3を実行することができます。Queue、threading.Threadとサブプロセスを使用している私のマルチスレッドのpythonスクリプトは、ひどいです。
私はこれらを数十個のファイルで実行する必要があります。したがって、スレッドを使用して並行して処理する必要があります。
私はこれを達成するために、Pythonスレッド、キュー、およびサブプロセスモジュールを使用しています。
私の問題は、スレッドカウントが1より大きい場合、プログラムが不安定に動作し、スレッドが再現可能な方法で互いにハンドオフしない場合です。 5つのスレッドすべてが完璧に動作し、完了まで働いています。
これはスレッドを使用して何かを行う私の最初の試みであり、これは競合条件を含むスレッドの通常の問題のためです。しかし私は自分のコードを整理する方法を知りたい。
実際のコードは(https://github.com/harijay/xtaltools/blob/master/process_multi.py)です。擬似コードは以下の通りです。コードが厄介であれば申し訳ありません。
私の質問はなぜこのデザインを使用して異常な動作をするのですか?スレッドは、いつでも異なるファイルにアクセスしています。また、シェルスクリプトが終了し、生成されたファイルがディスクに書き込まれた場合にのみ、subprocess.callが返されます。
どうすればいいですか? 私はここで私のデザインをできるだけ簡潔に説明しようとしました。ファイル入力を使用し、使用して新しい出力ファイルを生成するシェルスクリプトを実行するための
P1_Queue = Queue()
P2_Queue = Queue()
P3_Queue = Queue()
class P1_Thread(Thread):
def __init__(self,P1_Queue,P2_Queue):
Thread.__init__(self)
self.in_queue = P1_Queue
self.out_queue = P2_Queue
def run(self):
while True:
my_file_to_process = self.in_queue.get()
if my_file_to_process = None:
break
P1_runner = P1_Runner(my_file_to_process)
P1_runner.run_p1_using_subprocess()
self.out_queue.put(my_file_to_process)
クラスP1のランナーは、入力ファイルハンドルを取り、その後、subprocess.callを(呼び出す):
私の基本設計run_p1_using_subprocessメソッド。彼らは「didnの理由だけで
my_file_to_process = self.in_queue.get()
if my_file_to_process = None: # my sister ate faster than I did, so...
break # ... I kill myself!
スレッドが死んされています。スレッドの終了条件は、別のスレッドがその入力キューを空にするとき、彼らが自殺を作る
class P1_runner(object):
def __init__(self,inputfile):
self.my_shell_script = """#!/usr/bin/sh
prog_name <<eof
input 1
...
eof"""
self.my_shell_script_file = open("some_unique_p1_file_name.sh")
os.chmod("some_unique_file_name.sh",0755)
def run_p1_using_subprocess(self):
subprocess.call([self.my_shell_script_file])
I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated
The chaining is achieved using a series of Thread Pools.
p1_worker_list = []
p2_worker_list = []
p3_worker_list = []
for i in range(THREAD_COUNT):
p1_worker = P1_Thread(P1_Queue,P2_Queue)
p1_worker.start()
p1_worker_list.append(p1_worker)
for worker in p1_worker_list:
worker.join()
And then again the same code block for p2 and p3
for i in range(THREAD_COUNT):
p2_worker = P2_Thread(P2_Queue,P3_Queue)
p2_worker.start()
p2_worker_list.append(p1_worker)
for worker in p2_worker_list:
worker.join()
おかげであなたの助け/アドバイストン
これはあなたのコードではありません。 'if my_file_to_process = None:'は構文エラーです。 – Falmarri
申し訳ありませんコードは説明のためのものです - 構文エラー – harijay