2017-12-07 7 views
1

いくつかのゴルーチンにいくつかの負荷を分散したいと思います。タスクの数が事前に分かっている場合は、整理するのが簡単です。たとえば、私は待っているグループとファンをすることができます。golangとの再帰的同時実行

nTasks := 100 
nGoroutines := 10 

// it is important that this channel is not buffered 
ch := make(chan *Task) 
done := make(chan bool) 
var w sync.WaitGroup 
// Feed the channel until done 
go func() { 
    for i:= 0; i < nTasks; i++ { 
     task := getTaskI(i) 
     ch <- task 
    } 
    // as ch is not buffered once everything is read we know we have delivered all of them 
    for i:=0; i < nGoroutines; i++ { 
     done <- false 
    } 
}() 
for i:= 0; i < nGoroutines; i ++ { 
    w.Add(1) 
    go func() { 
     defer w.Done() 
     select { 
     case task := <-ch: 
      doSomethingWithTask(task) 
     case <- done: 
      return 
     } 
    }() 
} 
w.Wait() 
// All tasks done, all goroutines closed 

ただし、私の場合、各タスクは完了するタスクが増えます。たとえば、クロールされたウェブからすべてのリンクを受け取ったクローラとします。私の最初の勘違いは、実行されたタスクと保留中のタスクの数を追跡するメインループを持つことでした。私が完了したら、すべてのゴルーチンに終了信号を送ります:

nGoroutines := 10 
ch := make(chan *Task, nGoroutines) 
feedBackChannel := make(chan * Task, nGoroutines) 
done := make(chan bool) 

for i:= 0; i < nGoroutines; i ++ { 
    go func() { 
     select { 
     case task := <-ch: 
      task.NextTasks = doSomethingWithTask(task) 
      feedBackChannel <- task 
     case <- done: 
      return 
     } 
    }() 
} 

// seed first task 
ch <- firstTask 
nTasksRemaining := 1 

for nTasksRemaining > 0 { 
    task := <- feedBackChannel 
    nTasksRemaining -= 1 
    for _, t := range(task.NextTasks) { 
     ch <- t 
     nTasksRemaining++ 
    } 
} 
for i:=0; i < nGoroutines; i++ { 
    done <- false 
} 

ただし、これによりデッドロックが発生します。たとえば、NextTasksがゴルーチンの数より大きい場合、最初のタスクが終了するとメインループが停止します。しかし、mainLoopが書き込みを待っているのでフィードバックがブロックされているため、最初のタスクは終了できません。

これは、非同期的にチャンネルに投稿するのが簡単な方法の1つです。の代わりにgo func() {feedBackChannel <- task}()を実行します。さて、これはひどいハックのように感じます。特に、何十万というタスクがあるかもしれないからです。

このデッドロックを回避するにはどうすればよいでしょうか?私は並行性パターンを探しましたが、大部分は、後の段階が以前のステップに影響を与えないような、ファンアウトやパイプラインのような単純なものです。

+1

あなたの説明は少し複雑すぎて完全には理解できませんが、私には2つのノートがあります。 1.間違ってゴートラインの中でwaitGroup.Add()を実行すると、それを呼び出す前に完了する必要があります。私は通常、goroutineが開始されるとただちにwaitGroup.Done()を呼び出します。なぜfeedbackChannelが必要なのかは不明です。私には、必要に応じて新しいgrorutinesを単に生成する必要があると思われ、メインスレッドではwatGroup.Wait()を実行します。しかし、私はいくつかの要件を欠いているかもしれません。 –

+0

@AlexanderTrakhimenok新しいゴルーチンを生み出すことはできますが、ゴルーチンを再利用し、その量を制限することでより少ないリソースを消費すると思います。私の場合、私は数十万のタスクの順番で期待しています。 (私は待機グループを修正しました) –

+0

Goルーチンは非常に軽いですが、何の問題もなく数百万のゴルーチンを生成するのが普通です。あなたはレート制限パターンを調べることができますが、 –

答えて

0

問題を正しく理解していれば、ソリューションはかなり複雑です。ここにいくつかの点があります。それが役に立てば幸い。

  • コメントに記載されている人のように、ゴルーチンを起動するのは安いです(メモリとスイッチの両方がOSレベルよりずっと安いです)。いくつかの理由でワーカーのゴルーチンを使いたいとしましょう。
  • 代わりに行われたチャネルあなた可能性だけ近い chチャンネルの
  • 、代わりのselectあなただけのrange自分のチャンネルになったタスクを超えます。
  • chfeedBackChannelの分離のポイントが表示されないので、すべてのタスクをchにプッシュして容量を増やしてください。
  • 新しいタスクをエンキューしようとすると、デッドロックが発生する可能性があります。私の解決策はかなり素朴です。オーバーフローしないことが確実になるまで容量を増やしてください(cap(ch) - len(ch) < thresholdの場合は警告を記録することもできます)。 100万の容量を持つ(ポインタの)チャンネルを作成すると、RAMの約8 * 1e6 ~= 8MBが必要になります。
+0

と思ったどのくらい多くのタスクが残っているかを追跡する方法のfeedBackChannel。私はちょうどデフォルトで選択を行うことはできません。それは、もし私がある時点でタスクが不足していると、ゴルーチンが閉じ始めることを意味するからです。だから、私はそれを1つのタスクに集中させようとしました。 チャネルをたくさんバッファリングすることは、実際にはかなり可能です。あなたが言うように、チャネルのメモリフットプリントは本当に小さいです。 私がしたいと思うタスクはIOではなく、むしろCPU(索引を並列化している)なので、いつでもゴルーチンを開くことはおそらく影響を与えます。 –

+0

@GabrielFurstenheimあなたの問題に合っているかどうかわからないので、 'ch'のすべてのチャンネルをマージすると、' len(ch) 'でタスク数を得ることができます。後で時間があればコードサンプルを追加しようとします。 –