2017-08-25 12 views
0

google io 2010からロードバランサコードを取得し、バランサの優先キューと同期ロックの実装を追加しました。私は意図的にworkFnの機能遅延をrequesterよりも大きく設定しているので、どのように保留中の値が増加するのか分かります。私はそれをcliで実行し、すべてのワーカーが起動した後、すべてのワーカーに対して保留中の値でプログラムが停止し、何も表示されないことに気付きました。間違いがどこにあるのか分かりませんが、時々completedが1,2回呼び出されます。選択されたケースで<-b.doneが正しく処理されていないようです。単純なロードバランサが正しく動作しない

package main 

import (
    "container/heap" 
    "fmt" 
    "math/rand" 
    "os" 
    "sync" 
    "time" 
) 

var nWorker int32 = 6 

func main() { 
    rchanel := make(chan Request) 
    workers := Pool{ 
     {make(chan Request), 0, 0}, 
     {make(chan Request), 0, 1}, 
     {make(chan Request), 0, 2}, 
     {make(chan Request), 0, 3}, 
     {make(chan Request), 0, 4}, 
     {make(chan Request), 0, 5}, 
    } 
    doneChan := make(chan *Worker) 
    balancer := Balancer{workers, sync.Mutex{}, doneChan} 
    for _, elem := range workers { 
     go elem.work(doneChan) 
    } 
    go balancer.balance(rchanel) 
    go requester(rchanel) 

    var input string 
    fmt.Scanln(&input) 
} 

type Request struct { 
    fn func() int 
    c chan int 
} 

func requester(work chan Request) { 
    c := make(chan int) 
    for { 
     time.Sleep(time.Duration(rand.Int31n(nWorker)) * 2e4) 
     work <- Request{workFn, c} 
     go func() { 
      result := <-c 
      fmt.Fprintf(os.Stderr, "Done: %v \n", result) 
     }() 
    } 
} 

func workFn() int { 
    val := rand.Int31n(nWorker) 
    time.Sleep(time.Duration(val) * 2e8) 
    return int(val) 
} 

type Worker struct { 
    requests chan Request 
    pending int 
    index int 
} 

func (w *Worker) work(done chan *Worker) { 
    for { 
     req := <-w.requests 
     req.c <- req.fn() 
     done <- w 
    } 
} 

type Pool []*Worker 

func (p Pool) Less(i, j int) bool { 
    return p[i].pending < p[j].pending 
} 
func (p Pool) Swap(i, j int) { 
    p[i], p[j] = p[j], p[i] 
    p[i].index = i 
    p[j].index = j 
} 
func (p Pool) Len() int { return len(p) } 
func (p *Pool) Push(x interface{}) { 
    n := len(*p) 
    worker := x.(*Worker) 
    worker.index = n 
    *p = append(*p, worker) 
} 
func (p *Pool) Pop() interface{} { 
    old := *p 
    n := len(old) 
    item := old[n-1] 
    item.index = -1 
    *p = old[0 : n-1] 
    return item 
} 

type Balancer struct { 
    pool Pool 
    mu sync.Mutex 
    done chan *Worker 
} 

func (b *Balancer) dispatch(req Request) { 
    b.mu.Lock() 
    w := heap.Pop(&b.pool).(*Worker) 
    w.requests <- req 
    w.pending++ 
    heap.Push(&b.pool, w) 
    b.mu.Unlock() 
} 
func (b *Balancer) completed(w *Worker) { 
    b.mu.Lock() 
    w.pending-- 
    heap.Remove(&b.pool, w.index) 
    heap.Push(&b.pool, w) 
    b.mu.Unlock() 
} 

func (b *Balancer) balance(work chan Request) { 
    for { 
     select { 
     case req := <-work: 
      b.dispatch(req) 
      b.printStatus() 
     case w := <-b.done: 
      b.completed(w) 
      b.printStatus() 
     } 
    } 
} 

func (b *Balancer) printStatus() { 
    fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending) 
} 
+0

このコードは、 'Worker.work'でゴルーチンをリークします。最初の観察。問題を探しているコードを見直してください。 – RayfenWindspear

答えて

1

問題はbalance()ゴルーチンが、最終的に特定のWorkerbalance()を実行しているゴルーチンためのデッドロックを生産、done <- wwork()にブロックしていると同時に、w.requests <- reqdispatch()にブロックされるということです。

ここに必要な修正があります。 balance()は内部的にゴルーチンを利用する必要があります。ルーチンがdispatch()またはcompleted()でブロックするかどうかは問題にならないため、balance()のメインルーチンはchannelから継続します。

注:これは永久に行われるため、プレイグラウンドでは機能しません。

func (b *Balancer) balance(work chan Request) { 
    for { 
     select { 
     case req := <-work: 
      go func() { 
       b.dispatch(req) 
       b.printStatus() 
      }() 
     case w := <-b.done: 
      go func() { 
       b.completed(w) 
       b.printStatus() 
      }() 
     } 
    } 
} 

printStatus通話を同時に行うことができること、それは同様にmutexを利用するために必要、またはあなたがランダムpanic秒を取得します。 pending値はちょうど私の知る限り...増加し続けるなぜ私はちょうど把握できれば今

func (b *Balancer) printStatus() { 
    b.mu.Lock() 
    fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending) 
    b.mu.Unlock() 
} 

Worker.work()は今までWorkerdone <- wに待機する必要があるためpending0または1をことを可能にする必要がありますそれ以前に別のRequestdispatch()から得ることができます。私はこれが望ましい結果だと信じていますが、そうではありませんか?

+0

こんにちは@RayfenWindspear、説明のおかげで、ペンディングリクエストの増加は意図的に行われました。しかし、私は 'dispatch'の内部でブロックされた理由を理解できません。まったく別のチャンネル、' done < - w'と 'w.requests < - req'、' select'はディスパッチ中にもう一度実行されませんアクション、私は正しい? –

+1

@MaksymVolodin権利を実行しているルーチンが1つだけで、その特定のワーカーが '要求'を取得するのを待ってブロックしている間に、ワーカーは 'done < - w'を試みてブロックされます。私は実際にこれが非常に良い例であるとは確信していません。なぜなら、非常に多くのチャンネルが関与し、直感的ではない方法で何が起こっているのかを追跡することは非常に難しいからです。私はチャンネルを何度も何度も繰り返していることを確認するために、チャンネルの定義を何度もチェックしなければならない。確かに厳しいコードです。しかし、それは正しくないことを意味しません。 – RayfenWindspear

関連する問題