2017-11-28 5 views
1

私は、1 Sender(ディスパッチャ)がM(Worker)goルーチンにジョブを送る単純なワーカー・プール・アルゴリズムを実装しています。それは最初のアイドル作業者に使用可能なジョブを割り当てるチャンネルのチャンネルを使用していることを示します。Goで安全にチャンちゃんTを閉じるには?

// builds the pool 
func NewWorkerPool(maxWorkers int) WorkerPool { 
    pool := make(chan chan Job, maxWorkers) 
    workers := make([]Worker, 0) 
    return WorkerPool{ 
     WorkerPool: pool, 
     Workers: workers, 
     maxWorkers: maxWorkers, 
     waitGroup: sync.WaitGroup{}} 
} 

// Starts the WorkerPool 
func (p *WorkerPool) Run(queue chan Job) { 
    w := p.waitGroup 

    // starting n number of workers 
    for i := 0; i < p.maxWorkers; i++ { 
     worker := NewWorker(p.WorkerPool) 
     p.Workers = append(p.Workers, worker) 
     w.Add(1) 
     worker.Start(&w) 
    } 

    go p.dispatch(queue) 
} 

// dispatches a job to be handled by an idle Worker of the pool 
func (p *WorkerPool) dispatch(jobQueue chan Job) { 
    for { 
     select { 
     case job := <-jobQueue: 
      // a model request has been received 
      go func(job Job) { 
       // try to obtain a worker model channel that is available. 
       // this will block until a worker is idle 
       jobChannel := <-p.WorkerPool 

       // dispatch the model to the worker model channel 
       jobChannel <- job 
      }(job) 
     } 
    } 
} 


// checks if a Worker Pool is open or closed - If we can recieve on the channel then it is NOT closed 
func (p *WorkerPool) IsOpen() bool { 
    _, ok := <-p.WorkerPool 
    return ok 
} 

労働者開始と停止方法

// Start method starts the run loop for the worker, listening for a quit channel in 
// case we need to stop it 
func (w Worker) Start(wg *sync.WaitGroup) { 
    go func() { 
     defer wg.Done() 
     for { 
      // register the current worker into the worker queue. 
      w.WorkerPool <- w.JobChannel 

      select { 
      case job := <-w.JobChannel: 
       // we have received a work request. 
       result := job.Run() 
       job.ReturnChannel <- result 

       // once result is returned close the job output channel 
       close(job.ReturnChannel) 

      case <-w.quit: 
       // we have received a signal to stop 
       return 
      } 
     } 
    }() 
} 

// Stop signals the worker to stop listening for work requests. 
func (w Worker) Stop() { 
    go func() { 
     w.quit <- true 
    }() 
} 

今、私が使用してプールを閉鎖しようとしています次の方法では、私はすべてのワーカーがシャットダウンするのを待つためにsync.WaitGroupを使用します:

// stops the Pool 
func (p *WorkerPool) Stop() bool { 
    // stops all workers 
    for _, worker := range p.Workers { 
     worker.Stop() 
    } 
    p.waitGroup.Wait() //Wait for the goroutines to shutdown 

    close(p.WorkerPool) 

    more := p.IsOpen() 

    fmt.Printf(" more? %t", more) 

    return ok 
} 

//もっと印刷しますか?労働者は、私はまだこのような場合には欠けているものをオープンチャネルを、持って終了し、その後に近い(p.WorkerPool)を呼び出すようにするために、私は待っていてもTRUE

、どのようにそれに応じてチャンネルを閉鎖するには?

答えて

1

チャネルを閉じると、それ以上チャネルに値が送信されないことを示します。これは、チャンネルの受信者に完了を伝えるのに便利です。

チャネルのデータはまだあなたがチャネルを閉じて、その後ところで

// Stop stops the Pool and free all the channels 
func (p *WorkerPool) Stop() bool { 
    // stops all workers 
    for _, worker := range p.Workers { 
     worker.Stop() 
    } 
    p.waitGroup.Wait() //Wait for the goroutines to shutdown 
    close(p.WorkerPool) 
    for channel := range p.WorkerPool { 
     fmt.Println("Freeing channel") //remove all the channels 
    } 
    more := p.IsOpen() 
    fmt.Printf(" more? %t", more) 

    return ok 
} 

を次のようにその中のすべてのチャンネルを削除する必要がありますどこのチャンネルがある場合、1をチェックする_, ok <-を使用することはできませんが存在します閉まっている。私は関数の別の名前を提案します

関連する問題