2013-06-05 11 views
11

私は、1人の消費者と多くのプロデューサーをGoで実装する方法を見てきました.- Concurrency in Goの古典的なfanIn関数です。Go:1人のプロデューサーの多くの消費者

私が欲しいのは、fanOut関数です。これは、値を読み取るチャネルをパラメータとして受け取り、この値のコピーを書き込むチャネルのスライスを返します。

これを実装する正しい/推奨方法はありますか?

答えて

13

あなたはこれを行う最良の方法を説明しましたが、ここではそれを行うコードの小さなサンプルがあります。

囲碁遊び場:https://play.golang.org/p/jwdtDXVHJk

package main 

import (
    "fmt" 
    "time" 
) 

func producer(iters int) <-chan int { 
    c := make(chan int) 
    go func() { 
     for i := 0; i < iters; i++ { 
      c <- i 
      time.Sleep(1 * time.Second) 
     } 
     close(c) 
    }() 
    return c 
} 

func consumer(cin <-chan int) { 
    for i := range cin { 
     fmt.Println(i) 
    } 
} 

func fanOut(ch <-chan int, size, lag int) []chan int { 
    cs := make([]chan int, size) 
    for i, _ := range cs { 
     // The size of the channels buffer controls how far behind the recievers 
     // of the fanOut channels can lag the other channels. 
     cs[i] = make(chan int, lag) 
    } 
    go func() { 
     for i := range ch { 
      for _, c := range cs { 
       c <- i 
      } 
     } 
     for _, c := range cs { 
      // close all our fanOut channels when the input channel is exhausted. 
      close(c) 
     } 
    }() 
    return cs 
} 

func fanOutUnbuffered(ch <-chan int, size int) []chan int { 
    cs := make([]chan int, size) 
    for i, _ := range cs { 
     // The size of the channels buffer controls how far behind the recievers 
     // of the fanOut channels can lag the other channels. 
     cs[i] = make(chan int) 
    } 
    go func() { 
     for i := range ch { 
      for _, c := range cs { 
       c <- i 
      } 
     } 
     for _, c := range cs { 
      // close all our fanOut channels when the input channel is exhausted. 
      close(c) 
     } 
    }() 
    return cs 
} 

func main() { 
    c := producer(10) 
    chans := fanOutUnbuffered(c, 3) 
    go consumer(chans[0]) 
    go consumer(chans[1]) 
    consumer(chans[2]) 
} 

注意すべき重要な部分は、入力チャネルが枯渇した後、我々は出力チャンネルを閉じる方法です。また、出力チャンネルの1つが送信でブロックすると、他の出力チャンネルで送信を保持します。チャネルのバッファサイズを設定することによって、遅延の量を制御します。

+1

素晴らしいです!ありがとうございました。それは私を乱していたチャンネルの閉鎖でした。将来的にこれが必要な方々に感謝し、早急に言及しますので、ここでは実行中のバージョンがあります:http://play.golang.org/p/jwdtDXVHJk – Carl

2

以下このソリューションは少し不自然ですが、それは私の作品:

package main 

import (
    "fmt" 
    "time" 
    "crypto/rand" 
    "encoding/binary" 
) 

func handleNewChannels(arrchangen chan [](chan uint32), 
         intchangen chan (chan uint32)) { 
    currarr := []chan uint32{} 
    arrchangen <- currarr 
    for { 
     newchan := <-intchangen 
     currarr = append(currarr, newchan) 
     arrchangen <- currarr 
    } 
} 

func sendToChannels(arrchangen chan [](chan uint32)) { 
    tick := time.Tick(1 * time.Second) 
    currarr := <-arrchangen 
    for { 
     select { 
     case <-tick: 
      sent := false 
      var n uint32 
      binary.Read(rand.Reader, binary.LittleEndian, &n) 
      for i := 0 ; i < len(currarr) ; i++ { 
       currarr[i] <- n 
       sent = true 
      } 
      if sent { 
       fmt.Println("Sent generated ", n) 
      } 
     case newarr := <-arrchangen: 
      currarr = newarr 
     } 
    } 
} 
func handleChannel(tchan chan uint32) { 
    for { 
     val := <-tchan 
     fmt.Println("Got the value ", val) 
    } 
} 

func createChannels(intchangen chan (chan uint32)) { 
    othertick := time.Tick(5 * time.Second) 
    for { 
     <-othertick 
     fmt.Println("Creating new channel! ") 
     newchan := make(chan uint32) 
     intchangen <- newchan 
     go handleChannel(newchan) 
    } 
} 

func main() { 
    arrchangen := make(chan [](chan uint32)) 
    intchangen := make(chan (chan uint32)) 
    go handleNewChannels(arrchangen, intchangen) 
    go sendToChannels(arrchangen) 
    createChannels(intchangen) 
}