2017-10-18 4 views
-1

私は、各APIエンドポイントでできるだけ多くのhttp requestsを処理するワーカープール/ジョブキューシステムを構築しようとしています。私はこのexampleを調べて、別のエンドポイントにpool/jobqueueを拡張する方法を理解できないという問題に遭遇したことを除いて、それはうまく動作しました。Golang HTTPリクエストワーカープール

シナリオでは、異なるエンドポイントと要求の種類に応じて百万分の1リクエスト/分を持つGolang httpサーバーをスケッチしましょう。GET & POST ETC

この概念をどのように拡張できますか?各エンドポイントに異なるワーカー・プールとジョブを作成する必要があります。または、別のジョブを作成して同じキューに入力し、同じプールでこれを処理できますか?

私は新しいAPIエンドポイントを作成しても、新しいワーカープールを作成する必要がないので、APIだけに集中することができます。しかし、パフォーマンスも非常に念頭に置いてあります。

私が作成しようとしているコードは、先ほどリンクした例から取られています。hereは、このコードを持つ他の誰かのgithub 'gist'です。

+2

Goのhttpパッケージは、着信接続ごとにgoルーチンを開始します。バックグラウンドジョブの処理について話していない限り、これは無駄な努力のようです。 – squiguy

+0

はいこれはバックグラウンド処理に適しています。いくつかは終了するためにしばらく時間がかかるかもしれないと私はむしろコントロールされていない量のgoroutinesを緩くしないでください –

+0

goroutinesの問題は何ですか?基本的には、非同期サポートを備えたジョブキューのビルドイン実装です。 –

答えて

0

なぜワーカープールが必要なのかはっきりしないのですか? goroutinesで十分ではないでしょうか?

リソースが限られている場合は、rates limitingの実装を検討することができます。なぜ単に必要に応じてルーチンを実行しないのではないのでしょうか?

他の人がどのように良いことをするかを学ぶのが最も良い方法です。

は移動のためのhttps://github.com/valyala/fasthttp

高速HTTPパッケージを見てください。高性能のためにチューニングされました。ホット・パスのメモリー割り当てをゼロにします。 net/httpよりも最大10倍高速です。

彼らは主張している:

を物理サーバーごとに複数1.5M同時キープアライブ接続から200K RPSに

を提供する非常に印象的であり、私はあなたが行うことができます疑うことpool/jobqueueの方が良いでしょう。

1

前のこと:HTTPサーバー(Goの標準サーバーとともかく)を実行している場合、サーバーを停止して再起動することなく、ゴルーチンの数を制御することはできません。それぞれのリクエストは少なくとも1つのゴルーチンを開始し、それについて何もできません。良いことは、goroutinesはとても軽いので、これは通常問題ではないということです。しかし、厳しい作業をしているgoroutineの数を抑えたいというのは、まったく合理的です。

機能を含めて、チャンネルに任意の値を入力できます。だから、目標がHTTPハンドラにコードを書かなければならないのであれば、仕事は閉鎖されていなければなりません。労働者は自分が何をしているのかを知らない(または気にしません)。

package main 

import (
    "encoding/json" 
    "io/ioutil" 
    "net/http" 
) 

var largePool chan func() 
var smallPool chan func() 

func main() { 
    // Start two different sized worker pools (e.g., for different workloads). 
    // Cancelation and graceful shutdown omited for brevity. 

    largePool = make(chan func(), 100) 
    smallPool = make(chan func(), 10) 

    for i := 0; i < 100; i++ { 
      go func() { 
        for f := range largePool { 
          f() 
        } 
      }() 
    } 

    for i := 0; i < 10; i++ { 
      go func() { 
        for f := range smallPool { 
          f() 
        } 
      }() 
    } 

    http.HandleFunc("/endpoint-1", handler1) 
    http.HandleFunc("/endpoint-2", handler2) // naming things is hard, okay? 

    http.ListenAndServe(":8080", nil) 
} 

func handler1(w http.ResponseWriter, r *http.Request) { 
    // Imagine a JSON body containing a URL that we are expected to fetch. 
    // Light work that doesn't consume many of *our* resources and can be done 
    // in bulk, so we put in in the large pool. 
    var job struct{ URL string } 

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil { 
      http.Error(w, err.Error(), http.StatusBadRequest) 
      return 
    } 

    go func() { 
      largePool <- func() { 
        http.Get(job.URL) 
        // Do something with the response 
      } 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func handler2(w http.ResponseWriter, r *http.Request) { 
    // The request body is an image that we want to do some fancy processing 
    // on. That's hard work; we don't want to do too many of them at once, so 
    // so we put those jobs in the small pool. 

    b, err := ioutil.ReadAll(r.Body) 
    if err != nil { 
      http.Error(w, err.Error(), http.StatusInternalServerError) 
      return 
    } 

    go func() { 
      smallPool <- func() { 
        processImage(b) 
      } 
    }() 
    w.WriteHeader(http.StatusAccepted) 
} 

func processImage(b []byte) {} 

これは、ポイントを取得するための非常に簡単な例です。あなたのワーカープールのセットアップ方法はそれほど重要ではありません。あなたは巧妙な仕事の定義が必要です。上の例ではクロージャですが、たとえばJobインターフェイスを定義することもできます。

type Job interface { 
    Do() 
} 

var largePool chan Job 
var smallPool chan Job 

今、私は "単純な" 全体ワーカープールのアプローチを呼び出すことはありません。ゴルーチンの数を制限することが目標だと言いました。それは全く労働者を必要としません。リミッターが必要です。上記の例と同じですが、チャネルをセマフォーとして使用して並行性を制限しています。

package main 

import (
    "encoding/json" 
    "io/ioutil" 
    "net/http" 
) 

var largePool chan struct{} 
var smallPool chan struct{} 

func main() { 
    largePool = make(chan struct{}, 100) 
    smallPool = make(chan struct{}, 10) 

    http.HandleFunc("/endpoint-1", handler1) 
    http.HandleFunc("/endpoint-2", handler2) 

    http.ListenAndServe(":8080", nil) 
} 

func handler1(w http.ResponseWriter, r *http.Request) { 
    var job struct{ URL string } 

    if err := json.NewDecoder(r.Body).Decode(&job); err != nil { 
      http.Error(w, err.Error(), http.StatusBadRequest) 
      return 
    } 

    go func() { 
      // Block until there are fewer than cap(largePool) light-work 
      // goroutines running. 
      largePool <- struct{}{} 
      defer func() { <-largePool }() // Let everyone that we are done 

      http.Get(job.URL) 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func handler2(w http.ResponseWriter, r *http.Request) { 
    b, err := ioutil.ReadAll(r.Body) 
    if err != nil { 
      http.Error(w, err.Error(), http.StatusInternalServerError) 
      return 
    } 

    go func() { 
      // Block until there are fewer than cap(smallPool) hard-work 
      // goroutines running. 
      smallPool <- struct{}{} 
      defer func() { <-smallPool }() // Let everyone that we are done 

      processImage(b) 
    }() 

    w.WriteHeader(http.StatusAccepted) 
} 

func processImage(b []byte) {} 
0

前述のように、各リクエストハンドラは少なくとも1つのゴルーチンで動作します。

ただし、必要に応じてバックエンド並列タスクにワーカープールを使用することはできます。例えば、あなたのHttp Handler関数のいくつかが他の外部apisへの呼び出しをトリガーし、その結果をまとめて集計しているとしましょう。この場合、呼び出しの順番は関係ありません。これはワーカープールを活用して、

サンプルコードスニペット:

// build empty response 
    capacity := config.GetIntProperty("defaultListCapacity") 
    list := model.NewResponseList(make([]model.Response, 0, capacity), 1, 1, 0) 

    // search providers 
    providers := getProvidersByCountry(country) 

    // create a slice of jobResult outputs 
    jobOutputs := make([]<-chan job.JobResult, 0) 

    // distribute work 
    for i := 0; i < len(providers); i++ { 
     job := search(providers[i], m) 
     if job != nil { 
      jobOutputs = append(jobOutputs, job.ReturnChannel) 
      // Push each job onto the queue. 
      GetInstance().JobQueue <- *job 
     } 
    } 

    // Consume the merged output from all jobs 
    out := job.Merge(jobOutputs...) 
    for r := range out { 
     if r.Error == nil { 
      mergeSearchResponse(list, r.Value.(*model.ResponseList)) 
     } 
    } 
    return list 

彼らは労働者のゴルーチンに各タスクをディスパッチ並行して実行させるためで作業。 「汎用」タスクを非同期的に実行する作業者プールの完全な例:https://github.com/guilhebl/go-offer/blob/master/offer/repo.go

ワーカープールlib使用:https://github.com/guilhebl/go-worker-pool

関連する問題