前のこと:HTTPサーバー(Goの標準サーバーとともかく)を実行している場合、サーバーを停止して再起動することなく、ゴルーチンの数を制御することはできません。それぞれのリクエストは少なくとも1つのゴルーチンを開始し、それについて何もできません。良いことは、goroutinesはとても軽いので、これは通常問題ではないということです。しかし、厳しい作業をしているgoroutineの数を抑えたいというのは、まったく合理的です。
機能を含めて、チャンネルに任意の値を入力できます。だから、目標がHTTPハンドラにコードを書かなければならないのであれば、仕事は閉鎖されていなければなりません。労働者は自分が何をしているのかを知らない(または気にしません)。
package main
import (
"encoding/json"
"io/ioutil"
"net/http"
)
var largePool chan func()
var smallPool chan func()
func main() {
// Start two different sized worker pools (e.g., for different workloads).
// Cancelation and graceful shutdown omited for brevity.
largePool = make(chan func(), 100)
smallPool = make(chan func(), 10)
for i := 0; i < 100; i++ {
go func() {
for f := range largePool {
f()
}
}()
}
for i := 0; i < 10; i++ {
go func() {
for f := range smallPool {
f()
}
}()
}
http.HandleFunc("/endpoint-1", handler1)
http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay?
http.ListenAndServe(":8080", nil)
}
func handler1(w http.ResponseWriter, r *http.Request) {
// Imagine a JSON body containing a URL that we are expected to fetch.
// Light work that doesn't consume many of *our* resources and can be done
// in bulk, so we put in in the large pool.
var job struct{ URL string }
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go func() {
largePool <- func() {
http.Get(job.URL)
// Do something with the response
}
}()
w.WriteHeader(http.StatusAccepted)
}
func handler2(w http.ResponseWriter, r *http.Request) {
// The request body is an image that we want to do some fancy processing
// on. That's hard work; we don't want to do too many of them at once, so
// so we put those jobs in the small pool.
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
go func() {
smallPool <- func() {
processImage(b)
}
}()
w.WriteHeader(http.StatusAccepted)
}
func processImage(b []byte) {}
これは、ポイントを取得するための非常に簡単な例です。あなたのワーカープールのセットアップ方法はそれほど重要ではありません。あなたは巧妙な仕事の定義が必要です。上の例ではクロージャですが、たとえばJobインターフェイスを定義することもできます。
type Job interface {
Do()
}
var largePool chan Job
var smallPool chan Job
今、私は "単純な" 全体ワーカープールのアプローチを呼び出すことはありません。ゴルーチンの数を制限することが目標だと言いました。それは全く労働者を必要としません。リミッターが必要です。上記の例と同じですが、チャネルをセマフォーとして使用して並行性を制限しています。
package main
import (
"encoding/json"
"io/ioutil"
"net/http"
)
var largePool chan struct{}
var smallPool chan struct{}
func main() {
largePool = make(chan struct{}, 100)
smallPool = make(chan struct{}, 10)
http.HandleFunc("/endpoint-1", handler1)
http.HandleFunc("/endpoint-2", handler2)
http.ListenAndServe(":8080", nil)
}
func handler1(w http.ResponseWriter, r *http.Request) {
var job struct{ URL string }
if err := json.NewDecoder(r.Body).Decode(&job); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
go func() {
// Block until there are fewer than cap(largePool) light-work
// goroutines running.
largePool <- struct{}{}
defer func() { <-largePool }() // Let everyone that we are done
http.Get(job.URL)
}()
w.WriteHeader(http.StatusAccepted)
}
func handler2(w http.ResponseWriter, r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
go func() {
// Block until there are fewer than cap(smallPool) hard-work
// goroutines running.
smallPool <- struct{}{}
defer func() { <-smallPool }() // Let everyone that we are done
processImage(b)
}()
w.WriteHeader(http.StatusAccepted)
}
func processImage(b []byte) {}
Goのhttpパッケージは、着信接続ごとにgoルーチンを開始します。バックグラウンドジョブの処理について話していない限り、これは無駄な努力のようです。 – squiguy
はいこれはバックグラウンド処理に適しています。いくつかは終了するためにしばらく時間がかかるかもしれないと私はむしろコントロールされていない量のgoroutinesを緩くしないでください –
goroutinesの問題は何ですか?基本的には、非同期サポートを備えたジョブキューのビルドイン実装です。 –