2016-12-11 10 views
1

私はpython barrierを検索しましたが、関連する問題はほとんどありません。私はまだ、barrier.wait()について混乱しています。私のコードも動作します。"Python threading barrier"なぜこのコードがうまくいっていますか?もっと良い方法がありますか?

私はこのような機能を実装するためにPythonバリアを利用しています:メインスレッドとnサブスレッド。各ラウンドでは、メインスレッドは現在の作業を終了するすべてのサブスレッドを待ちます。そして、すべてのスレッドは、条件を満たすまで次のラウンドに進みます。したがって、私は障壁がこの関数を実装するのが適切であることを発見しました、ここで私のコードはメインスレッドです。

def superstep(self): 
    workers = [] 
    barrier = threading.Barrier(self.num_workers+1) 
    for vertex in self.vertices: 
     worker = Worker(vertex, barrier) 
     workers.append(worker) 
     worker.start() 

    while self.flag: 
     barrier.wait() 
     self.distributeMessages() 
     self.count += 1 
     print ("superstep: ", self.count) 
     self.flag = self.isTerminated() 

    for worker in workers: 
     worker.flag = False 

    for worker in workers: 
     worker.join() 
  1. ループは労働者という名前のn個のスレッドを作成し、リストの労働者に保存されている 'の初。
  2. 'while'ループは、他のサブスレッドを待つメインスレッドであり、self.flagがFalseのときにブレークします。
  3. 2番目の 'for'ループは、各ワーカー(サブスレッド)でフラグをFalseに設定し、ループを終了するように指示します。

ここは私のWorkerクラスです。

class Worker(threading.Thread): 
    def __init__(self, vertex, barrier): 
     threading.Thread.__init__(self) 
     self.vertex = vertex 
     self.flag = True 
     self.barrier = barrier 

    def run(self): 
     while self.flag: 
      self.barrier.wait() 
      do something 

コードはうまくいき、すべてのスレッドが参加できます()。しかし、python barrierを見ると、すべてのスレッドがwait()を呼び出すと同時にすべてのスレッドが解放されます。メインスレッドがwhileループから壊れ、他のすべてのスレッドがまさにそれを待っている場合は、2番目の 'for'ループは役に立たず、サブスレッドは決してjoin()しません。

このコードはどのように機能しますか?BrokenBarrierErrorを発生させる代わりに、バリアを終了する方法はありますか?さらに、2番目の 'for'ループにいくつかのコードを追加したり、何らかの情報を印刷したりすると、そのプロシージャはブロックされます。私はwait()にあるいくつかのサブスレッドがあり、フラグをチェックする機会がないと思うので、スレッドのrun()を終了できません。

+0

待っている労働者を解放するために、2番目の後に 'barrier.abort()'を呼び出すことができます。 – Gribouillis

+0

@Gribouillisご返信ありがとうございます。 barrier.abort()はBrokenBarrierErrorを発生させ、コードが実行されるのを防ぐので、より良い方法があるかどうか疑問です。 – fancyqlx

+0

ワーカースレッドでBrokenBarrierErrorを捕まえることができます – Gribouillis

答えて

1

abortを使用しない場合は、各スレッドにBarrier.waitを2回コールすることができます。これにより、操作が2つの部分に分割されます。最初の部分では、ワーカースレッドは作業を行い、メインスレッドはフラグステータスを更新します。 2番目の部分では、すべてのスレッドがフラグの状態をチェックし、必要に応じてループを終了します。それはこのようなものになり、コードレベルで

# Main 
def superstep(self): 
    workers = [] 
    barrier = threading.Barrier(self.num_workers+1) 
    for vertex in self.vertices: 
     worker = Worker(vertex, barrier) 
     workers.append(worker) 
     worker.start() 

    while self.flag: 
     barrier.wait() 
     self.distributeMessages() 
     self.count += 1 
     print ("superstep: ", self.count) 
     self.flag = self.isTerminated() 
     for worker in workers: 
      worker.flag = self.flag 
     barrier.wait() 

    for worker in workers: 
     worker.join() 

# Worker 
def run(self): 
    while self.flag: 
     self.barrier.wait() 
     # do something 
     self.barrier.wait() 
+0

ありがとうございます。このソリューションはうまくいき、多くのことを学びます。 – fancyqlx

1

をあなたはループのための第二の後に待っている労働者を解放し、労働者のrun()方法でBrokenBarrierErrorをキャッチする

self.barrier.abort() 

を呼び出すことができます。

+0

もう一度ありがとう、私が前に受け入れた答えはまた非常に美しいです。 – fancyqlx

+0

大丈夫です。 – Gribouillis

関連する問題