2012-04-26 14 views
1

私は、リソースの幅優先探索を実行するアルゴリズムがあります。スレッド〜幅優先処理

def crawl(starting_node) 
    items=[starting_node] 
    until items.empty? 
    item = items.shift 
    kids = item.slow_network_action # takes seconds 
    kids.each{ |kid| items << kid } 
    end 
end 

を私はslow_network_actionを並列化するために、いくつかの同時実行スレッドを使用したいと思います。
これを行うにはどのような方法が適していますか?

はここを働く技術だが、私は、特定の感じ正しいアプローチではありません:に項目を待っている間に、スレッドが実際に寝ているよう

def crawl(starting_node) 
    mutex = Mutex.new 
    items = [starting_node] 
    4.times.map{ 
    loop do 
     unless item=mutex.synchronize{ items.shift } 
     sleep LONGER_THAN_LONGEST_NETWORK_ACTION 
     break unless item=mutex.synchronize{ items.shift } 
     end 
     kids = item.slow_network_action 
     mutex.synchronize{ 
     kids.each{ |kid| items << kid } 
     } 
    end 
    }.each(&:join) 
end 

私が何かをしたいと思いますキューに追加され、アイテムが追加されたときに起動し、誰もが待機しているとき、追加されていないときにすべてのスレッドを終了させる。


この代替コードはほとんど動作しますが(とない)が発生することができ、デッドロック、および適切な出口戦略の合計不足のため:

require 'thread' 
def crawl(starting_node) 
    items = Queue.new 
    items << starting_node 
    4.times.map{ 
    while item=items.shift 
     kids = item.slow_network_action 
     kids.each{ |kid| items << kid } 
    end 
    }.each(&:join) 
end 
+0

モニターと条件変数を調べる必要があります。http://www.ruby-doc.org/stdlib-1.9.3/libdoc/monitor/rdoc/Monitor.html。私はより詳細な答えを書くだろうが、私は寝るつもりです。 – matt

答えて

2

これは正しい方向にあなたを指している必要があります:

require 'monitor' 

NUM_THREADS = 4 

def crawl(starting_node) 
    items = [starting_node] 
    items.extend MonitorMixin 
    item_cond = items.new_cond 

    threads = [] 
    working_threads = 0 
    finished = false 

    NUM_THREADS.times do 
    items.synchronize do 
     working_threads += 1 
    end 
    threads << Thread.new do 
     item = nil 
     kids = [] 
     loop do 
     items.synchronize do 

      #add any new items to array 
      items.concat kids 

      if (items.empty? && working_threads == 1) 
      #all other threads are waiting, and there's no more items 
      #to process, so we must be done 
      finished = true 
      end 

      #wake up all waiting threads, either to finish or do more work 
      #watch out for thundering herds 
      item_cond.broadcast unless (items.empty? && !finished) 

      #wait, but first decrement count of working threads 
      #so we can determine when to finish 
      working_threads -= 1 
      item_cond.wait_while { items.empty? && !finished} 
      Thread.exit if finished 
      working_threads += 1 

      #get next item 
      item = items.shift 
     end 

     kids = item.slow_network_action 
     end 

    end 
    end 

    threads.each(&:join) 
end 

これはmonitoritemsアレイを作成し、それを介して任意の同期を行い、asociatedと共にモニターからを作成します。

これはQueueが内部的にどのように動作するかに似ていますが、これはすべての作業が終了した時点をチェックする点が異なります(実際には少し複雑です)。

スレッドのメインループは、に追加された空のkids配列で始まり、ループ内に2つの別々の同期ブロックが必要となることを避けるために、競合状態が発生します。

broadcastを使用すると、すべての待機スレッドが復帰し、潜在的にthundering herdが発生することに注意してください。私はこれがここで何か問題を引き起こすとは思わない。代わりに、一度にkidsの要素を1つずつ追加し、それぞれに対してsignalを呼び出します。これは、すべての作業が終了したときに対処するためにより複雑になります。

+0

ありがとうございます。私はあなたが合格マークを得る前にこれを読んで、テストし、消化しなければなりませんが、合理的に見えます。 – Phrogz

関連する問題