2016-04-06 6 views
0

私はフロートの多くの配列をストリームするサーバーを実装しています。オーディオプロセスが独立していなければならないチャンネルとゴルーチンを使ってGoウェブサーバーを構成する方法は?

  • と入ってくるすべての要求がない場合でも、その仕事をする私の現在のアプローチまで待つことDataProcess関数を作る:私は次のことを達成するために、私のシステムを設計するためにいくつかの助けを必要としています。要求があります。
  • チャンネルは1回のリクエストに対してのみデータを与えることができるので、2回以上のリクエストでDataProcessで準備したデータをどのように取得できますか?
  • 実際にデータをストリームするには、リクエストハンドラはDataProcess全体が終了するのを待つことはできません。DataProcessの各繰り返しを完了するとすぐにハンドラデータを返すことができますか?

ご回答いただければ幸いです。これは私の現在の考えです:

package main 

import (
"fmt" 
"io" 
"net/http" 
"strconv" 
"time" 
) 

func main() { 
    c := AudioProcess() 
    handleHello := makeHello(c) 

    http.HandleFunc("/", handleHello) 
    http.ListenAndServe(":8000", nil) 
} 

func makeHello(c chan string) func(http.ResponseWriter, *http.Request) { 
    return func(w http.ResponseWriter, r *http.Request) { 
     for item := range c { // this loop runs when channel c is closed 
      io.WriteString(w, item) 
     } 
    } 
} 

func AudioProcess() chan string { 
    c := make(chan string) 
    go func() { 
     for i := 0; i <= 10; i++ { // Iterate the audio file 
      c <- strconv.Itoa(i) // have my frame of samples, send to channel c 
      time.Sleep(time.Second) 
      fmt.Println("send ", i) // logging 
     } 
     close(c) // done processing, close channel c 
     }() 
     return c 
    } 

答えて

0

私は私はあなたのユースケースを十分に認識していないよ、これはあなたの問題を解決する場合は、完全にわからないんだけど、それにもかかわらず、私は以下のソリューションを作ってみました。

私はGinをHTTPルータに使用しましたが、これは私にとってはより快適だったためですが、自分のものに合わせてコードを修正できると確信しています。私はこれを急いで(申し訳ありません)しました。私は気づいていない問題があるかもしれませんが、もしあれば教えてください。要するに

  1. 私はいくつかのClientの世話をManagerを作成しました。また、1つだけスレッドがいつでもclientsを変更していることを確認するためにsync.Mutexが含まれています。
  2. ランダム番号float64を生成するInitBackgroundTask()があり、Manager(存在する場合)のALL clientsに渡します。 clientsがない場合は、スリープ状態になります。
  3. インデックスハンドラは、クライアントの追加と削除を処理します。クライアントはUUIDによって識別されます。
  4. 現在、3つのことが起こり得る。 <-c.Writer.CloseNotify()チャネルを介して接続を切断すると、クライアントは自動的に削除されます(これによりメソッドが返され、deferが呼び出されるため)。次のバックグラウンドタスクでランダムなfloat64番号を受け取ることもできます。最後に、20歳代を受け取っていない場合は、終了することもできます。

私はここであなたの必要性についていくつかの仮定をしました(例えば、バックグラウンドタスクはY分ごとにXを返します)。より細かい穀物のストリーミングを探しているなら、代わりにウェブソケットを使うことをお勧めします(そして、以下のパターンは引き続き使用できます)。

質問がある場合はお知らせください。

コード:

package main 

import (
    "github.com/gin-gonic/gin" 
    "github.com/satori/go.uuid" 
    "log" 
    "math/rand" 
    "net/http" 
    "sync" 
    "time" 
) 

type Client struct { 
    uuid string 
    out chan float64 
} 

type Manager struct { 
    clients map[string]*Client 
    mutex sync.Mutex 
} 

func NewManager() *Manager { 
    m := new(Manager) 
    m.clients = make(map[string]*Client) 
    return m 
} 

func (m *Manager) AddClient(c *Client) { 
    m.mutex.Lock() 
    defer m.mutex.Unlock() 
    log.Printf("add client: %s\n", c.uuid) 
    m.clients[c.uuid] = c 
} 

func (m *Manager) DeleteClient(id string) { 
    m.mutex.Lock() 
    defer m.mutex.Unlock() 
    // log.Println("delete client: %s", c.uuid) 
    delete(m.clients, id) 
} 

func (m *Manager) InitBackgroundTask() { 
    for { 
     f64 := rand.Float64() 
     log.Printf("active clients: %d\n", len(m.clients)) 
     for _, c := range m.clients { 
      c.out <- f64 
     } 
     log.Printf("sent output (%+v), sleeping for 10s...\n", f64) 
     time.Sleep(time.Second * 10) 
    } 
} 

func main() { 
    r := gin.Default() 
    m := NewManager() 

    go m.InitBackgroundTask() 

    r.GET("/", func(c *gin.Context) { 
     cl := new(Client) 
     cl.uuid = uuid.NewV4().String() 
     cl.out = make(chan float64) 

     defer m.DeleteClient(cl.uuid) 
     m.AddClient(cl) 

     select { 
     case <-c.Writer.CloseNotify(): 
      log.Printf("%s : disconnected\n", cl.uuid) 
     case out := <-cl.out: 
      log.Printf("%s : received %+v\n", out) 
      c.JSON(http.StatusOK, gin.H{ 
       "output": out, 
      }) 
     case <-time.After(time.Second * 20): 
      log.Println("timed out") 
     } 
    }) 

    r.Run() 
} 

注:Chromeでこれをテストしている場合、あなたはリクエストが実際に行われるように、URLの末尾にランダムなパラメータを追加する必要がある場合があり、例えば?rand=001,?rand=002など。

関連する問題