私は、1 Sender(ディスパッチャ)がM(Worker)goルーチンにジョブを送る単純なワーカー・プール・アルゴリズムを実装しています。それは最初のアイドル作業者に使用可能なジョブを割り当てるチャンネルのチャンネルを使用していることを示します。Goで安全にチャンちゃんTを閉じるには?
// builds the pool
func NewWorkerPool(maxWorkers int) WorkerPool {
pool := make(chan chan Job, maxWorkers)
workers := make([]Worker, 0)
return WorkerPool{
WorkerPool: pool,
Workers: workers,
maxWorkers: maxWorkers,
waitGroup: sync.WaitGroup{}}
}
// Starts the WorkerPool
func (p *WorkerPool) Run(queue chan Job) {
w := p.waitGroup
// starting n number of workers
for i := 0; i < p.maxWorkers; i++ {
worker := NewWorker(p.WorkerPool)
p.Workers = append(p.Workers, worker)
w.Add(1)
worker.Start(&w)
}
go p.dispatch(queue)
}
// dispatches a job to be handled by an idle Worker of the pool
func (p *WorkerPool) dispatch(jobQueue chan Job) {
for {
select {
case job := <-jobQueue:
// a model request has been received
go func(job Job) {
// try to obtain a worker model channel that is available.
// this will block until a worker is idle
jobChannel := <-p.WorkerPool
// dispatch the model to the worker model channel
jobChannel <- job
}(job)
}
}
}
// checks if a Worker Pool is open or closed - If we can recieve on the channel then it is NOT closed
func (p *WorkerPool) IsOpen() bool {
_, ok := <-p.WorkerPool
return ok
}
労働者開始と停止方法
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start(wg *sync.WaitGroup) {
go func() {
defer wg.Done()
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
result := job.Run()
job.ReturnChannel <- result
// once result is returned close the job output channel
close(job.ReturnChannel)
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
今、私が使用してプールを閉鎖しようとしています次の方法では、私はすべてのワーカーがシャットダウンするのを待つためにsync.WaitGroupを使用します:
// stops the Pool
func (p *WorkerPool) Stop() bool {
// stops all workers
for _, worker := range p.Workers {
worker.Stop()
}
p.waitGroup.Wait() //Wait for the goroutines to shutdown
close(p.WorkerPool)
more := p.IsOpen()
fmt.Printf(" more? %t", more)
return ok
}
//もっと印刷しますか?労働者は、私はまだこのような場合には欠けているものをオープンチャネルを、持って終了し、その後に近い(p.WorkerPool)を呼び出すようにするために、私は待っていてもTRUE
、どのようにそれに応じてチャンネルを閉鎖するには?