2010-12-28 5 views
1

私は3つのシェルスクリプトP1、P2、P3をチェーン化しようとしています。これらの3つのシェルスクリプトは連続して実行する必要がありますが、いつでも複数のP1とP2とP3を実行することができます。Queue、threading.Threadとサブプロセスを使用している私のマルチスレッドのpythonスクリプトは、ひどいです。

私はこれらを数十個のファイルで実行する必要があります。したがって、スレッドを使用して並行して処理する必要があります。

私はこれを達成するために、Pythonスレッド、キュー、およびサブプロセスモジュールを使用しています。

私の問題は、スレッドカウントが1より大きい場合、プログラムが不安定に動作し、スレッドが再現可能な方法で互いにハンドオフしない場合です。 5つのスレッドすべてが完璧に動作し、完了まで働いています。

これはスレッドを使用して何かを行う私の最初の試みであり、これは競合条件を含むスレッドの通常の問題のためです。しかし私は自分のコードを整理する方法を知りたい。

実際のコードは(https://github.com/harijay/xtaltools/blob/master/process_multi.py)です。擬似コードは以下の通りです。コードが厄介であれば申し訳ありません。

私の質問はなぜこのデザインを使用して異常な動作をするのですか?スレッドは、いつでも異なるファイルにアクセスしています。また、シェルスクリプトが終了し、生成されたファイルがディスクに書き込まれた場合にのみ、subprocess.callが返されます。

どうすればいいですか? 私はここで私のデザインをできるだけ簡潔に説明しようとしました。ファイル入力を使用し、使用して新しい出力ファイルを生成するシェルスクリプトを実行するための

P1_Queue = Queue() 
P2_Queue = Queue() 
P3_Queue = Queue() 

class P1_Thread(Thread): 
    def __init__(self,P1_Queue,P2_Queue): 
     Thread.__init__(self) 
     self.in_queue = P1_Queue 
     self.out_queue = P2_Queue 

    def run(self): 
     while True: 
      my_file_to_process = self.in_queue.get() 
      if my_file_to_process = None: 
       break 
      P1_runner = P1_Runner(my_file_to_process) 
      P1_runner.run_p1_using_subprocess() 
      self.out_queue.put(my_file_to_process) 

クラスP1のランナーは、入力ファイルハンドルを取り、その後、subprocess.callを(呼び出す):

私の基本設計run_p1_using_subprocessメソッド。彼らは「didnの理由だけで

my_file_to_process = self.in_queue.get() 
    if my_file_to_process = None: # my sister ate faster than I did, so... 
     break # ... I kill myself! 

スレッドが死んされています。スレッドの終了条件は、別のスレッドがその入力キューを空にするとき、彼らが自殺を作る

class P1_runner(object): 

    def __init__(self,inputfile): 
     self.my_shell_script = """#!/usr/bin/sh 
prog_name <<eof 
input 1 
... 
eof""" 
     self.my_shell_script_file = open("some_unique_p1_file_name.sh") 
     os.chmod("some_unique_file_name.sh",0755) 

    def run_p1_using_subprocess(self): 
     subprocess.call([self.my_shell_script_file]) 

I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated 

The chaining is achieved using a series of Thread Pools. 
p1_worker_list = [] 
p2_worker_list = [] 
p3_worker_list = [] 

for i in range(THREAD_COUNT): 
    p1_worker = P1_Thread(P1_Queue,P2_Queue) 
    p1_worker.start() 
    p1_worker_list.append(p1_worker) 

for worker in p1_worker_list: 
    worker.join() 

And then again the same code block for p2 and p3 

for i in range(THREAD_COUNT): 
    p2_worker = P2_Thread(P2_Queue,P3_Queue) 
    p2_worker.start() 
    p2_worker_list.append(p1_worker) 

for worker in p2_worker_list: 
    worker.join() 

おかげであなたの助け/アドバイストン

+0

これはあなたのコードではありません。 'if my_file_to_process = None:'は構文エラーです。 – Falmarri

+0

申し訳ありませんコードは説明のためのものです - 構文エラー – harijay

答えて

2

まあ、これは本当に悪いです:

runner.run() 

あなたが今まで手動でスレッドのrunメソッドを呼び出してはいけません。あなたは.start()でスレッドを開始します。あなたのコードは大規模な混乱であり、あなたのエラーを見つけるためにここを歩き回る人はいません。

+0

ポインタありがとうございますが、これはまさに私が探していたものです。 – harijay

+0

ありがとうFalmarriとApalala。コードをきれいに整理しました。 https://github.com/harijay/auriga。これはOSX上でシームレスに動作します。私はまだ "OSError:[Errno 26] Text file busy"をsubprocess.check_call()でランダムに取得します。私はそれに相当するPopen呼び出しで置き換えようとしましたが、常にubuntuでエラーが発生します – harijay

+0

@Downvoter:-1を説明するケア? – Falmarri

1

彼らがより多くの準備が整った時に行うべき仕事を見つける。

代わりに、入力キューのイベントが通知されるまでスレッドをスリープ(待機)する必要があります。オーケストレーター(メインプログラム)が処理が完了したことを通知したときにのみダイを待ちます(自殺フラグを設定し、キュー)。

(私はすでにコードを変更しています) @Falmarriは、おそらくあなたのコード内のthreadingライブラリ全体の使用が間違っているので、あなたの質問は、特定の問題(他の人が答えることができるもの)ではないということです、そしてプログラミングの使用他の場所で彼のノートに何を意味

一般的に言葉は厄介です。たとえば:worker.join()

  • コールはメインプログラムが並行処理で任意の試みを破っので、P2のスレッドを起動する前に、順番に、すべてのP1スレッドの終了を待つことができます。
  • Thread.run()を無効にするか、コンストラクターに呼び出し可能にする必要があります。 Pn_runnerクラスは不要です。
  • すべてのスレッドクラスは同じです。プロセスステージごとに異なるクラスは必要ありません。
  • 既にPythonを使用している場合は、純粋なPythonで簡単に作業を行うことができない限り、外部プログラム(シェルスクリプトはほとんどありません)を呼び出すことは意味がありません。
  • 上記の理由から、プログラムをシェルシステムに書き込むのは非常に奇妙で、ほとんど必要ありません。私はあなたのこの特定の問題を解決するために行うことをお勧め何

は次のとおりです。

  1. 100%のPythonに固執するようにしてください。あなたができないか、それほど難しいと思われる場合は、少なくとも外部からアクセスしなければならない特定の機能が見つかったでしょう。
  2. 並行性を使用しないソリューションを構築します。
  3. プログラムのパフォーマンスを測定し、アルゴリズム的に改善してください。
  4. できればスレッディングを避けてください。 CPUにバインドされたプログラムは、スレッドを使わずに利用可能なすべてのサイクルを食べます。あまりにもディスクにバインドされている(または外部/リモートのリソースにバインドされている)プログラムは、何もする必要がなければ、ディスクを待つことになります。スレッド化の恩恵を受けるには、計算と外部リソース使用の間で適切なバランスが取られていなければなりません(または、ビジー状態でも要求が到着してもサービスを提供できる必要があります)。
  5. pythonic way:シンプルにして、徐々に機能性と複雑性を高めながら、常に複雑なものは避けてください。

あなたの意図がPythonでのスレッド化について教えているのであれば、ぜひ試してみてください。そして、あなたが望むのは、複数のシェルスクリプトを同時に実行することだったならば、bashと他のシェルにはすでにそのための規定があり、Pythonを使う必要はありません。

+0

を編集しましたスレッドをスリープ状態にする方法がわかりません。またスレッドは自動的にスティックされますか? – harijay

+0

my_file_to_process =なし:sleep(sleep_time) で十分ですが、素朴です。私は自分自身でPythonライブラリを学習しています。私はすでにそこに必要なものがあることを知っています。ミューテックスの標準ライブラリを見てください。また、スレッドセーフなデータ構造では同時監視はできません。 – Apalala

+0

http://docs.python.org/library/threading.htmlもご覧ください。 – Apalala

関連する問題