2016-07-15 19 views
0

チャネルから受信したデータをチャネルのリストにブロードキャストしたいと思います。チャネルのリストは動的であり、実行フェーズ中に変更することができます。Goで複数のチャネルでチャネルをブロードキャスト

Goの新しい開発者として、私はこのコードを書いた。私はそれが私が欲しいもののためにかなり重いことが分かった。これを行うには良い方法がありますか? stopChannelStop方法を取り除く:

package utils 

import "sync" 

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels 
type StringChannelBroadcaster struct { 
    Source  chan string 
    Subscribers map[string]*StringChannelSubscriber 
    stopChannel chan bool 
    mutex  sync.Mutex 
    capacity uint64 
} 

// NewStringChannelBroadcaster creates a StringChannelBroadcaster 
func NewStringChannelBroadcaster(capacity uint64) (b *StringChannelBroadcaster) { 
    return &StringChannelBroadcaster{ 
     Source:  make(chan string, capacity), 
     Subscribers: make(map[string]*StringChannelSubscriber), 
     capacity: capacity, 
    } 
} 

// Dispatch starts dispatching message 
func (b *StringChannelBroadcaster) Dispatch() { 
    b.stopChannel = make(chan bool) 
    for { 
     select { 
     case val, ok := <-b.Source: 
      if ok { 
       b.mutex.Lock() 
       for _, value := range b.Subscribers { 
        value.Channel <- val 
       } 
       b.mutex.Unlock() 
      } 
     case <-b.stopChannel: 
      return 
     } 
    } 
} 

// Stop stops the Broadcaster 
func (b *StringChannelBroadcaster) Stop() { 
    close(b.stopChannel) 
} 

// StringChannelSubscriber defines a subscriber to a StringChannelBroadcaster 
type StringChannelSubscriber struct { 
    Key  string 
    Channel chan string 
} 

// NewSubscriber returns a new subsriber to the StringChannelBroadcaster 
func (b *StringChannelBroadcaster) NewSubscriber() *StringChannelSubscriber { 
    key := RandString(20) 
    newSubscriber := StringChannelSubscriber{ 
     Key:  key, 
     Channel: make(chan string, b.capacity), 
    } 
    b.mutex.Lock() 
    b.Subscribers[key] = &newSubscriber 
    b.mutex.Unlock() 

    return &newSubscriber 
} 

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster 
func (b *StringChannelBroadcaster) RemoveSubscriber(subscriber *StringChannelSubscriber) { 
    b.mutex.Lock() 
    delete(b.Subscribers, subscriber.Key) 
    b.mutex.Unlock() 
} 

は、私はあなたがそれを少し簡略化することができると思い

ジュリアン

+0

低レベルの操作のまわりに構文的なラッパーがないため、goコードが「重い」と感じることがあります。これは私の普通のアプローチのようです。あなたは "軽い"と何を見たいですか? –

答えて

1

、ありがとうございます。 Stopを呼び出すのではなくSourceを閉じるだけで、終了するにはDispatch(okはfalse)であることを検出するだけです(実際にソースチャネルの範囲を指定することができます)。

Dispatchを取り除いて、NewStringChannelBroadcasterのゴルーチンをforサイクルで開始するだけで、外部コードでディスパッチサイクルを個別に開始する必要はありません。

マップキーとしてチャネルタイプを使用できるため、マップは​​(マップ値を必要としないため空の構造体)になる可能性があります。したがって、NewSubscriberはチャンネルタイプのパラメータを取ることができます(または新しいチャンネルを作成して返す)ことができ、そのチャンネルをマップに挿入すると、ランダムな文字列やStringChannelSubscriberタイプは必要ありません。

私はまた、加入者のチャンネルを閉じるように、いくつかの改良を加え:

package main 

import "sync" 

import (
    "fmt" 
    "time" 
) 

// StringChannelBroadcaster broadcasts string data from a channel to multiple channels 
type StringChannelBroadcaster struct { 
    Source  chan string 
    Subscribers map[chan string]struct{} 
    mutex  sync.Mutex 
    capacity uint64 
} 

// NewStringChannelBroadcaster creates a StringChannelBroadcaster 
func NewStringChannelBroadcaster(capacity uint64) *StringChannelBroadcaster { 
    b := &StringChannelBroadcaster{ 
     Source:  make(chan string, capacity), 
     Subscribers: make(map[chan string]struct{}), 
     capacity: capacity, 
    } 
    go b.dispatch() 
    return b 
} 

// Dispatch starts dispatching message 
func (b *StringChannelBroadcaster) dispatch() { 
    // for iterates until the channel is closed 
    for val := range b.Source { 
     b.mutex.Lock() 
     for ch := range b.Subscribers { 
      ch <- val 
     } 
     b.mutex.Unlock() 
    } 
    b.mutex.Lock() 
    for ch := range b.Subscribers { 
     close(ch) 
     // you shouldn't be calling RemoveSubscriber after closing b.Source 
     // but it's better to be safe than sorry 
     delete(b.Subscribers, ch) 
    } 
    b.Subscribers = nil 
    b.mutex.Unlock() 
} 

func (b *StringChannelBroadcaster) NewSubscriber() chan string { 
    ch := make(chan string, b.capacity) 
    b.mutex.Lock() 
    if b.Subscribers == nil { 
     panic(fmt.Errorf("NewSubscriber called on closed broadcaster")) 
    } 
    b.Subscribers[ch] = struct{}{} 
    b.mutex.Unlock() 

    return ch 
} 

// RemoveSubscriber removes a subscrber from the StringChannelBroadcaster 
func (b *StringChannelBroadcaster) RemoveSubscriber(ch chan string) { 
    b.mutex.Lock() 
    if _, ok := b.Subscribers[ch]; ok { 
     close(ch)     // this line does have to be inside the if to prevent close of closed channel, in case RemoveSubscriber is called twice on the same channel 
     delete(b.Subscribers, ch) // this line doesn't need to be inside the if 
    } 
    b.mutex.Unlock() 
} 

func main() { 
    b := NewStringChannelBroadcaster(0) 

    var toberemoved chan string 

    for i := 0; i < 3; i++ { 
     i := i 

     ch := b.NewSubscriber() 
     if i == 1 { 
      toberemoved = ch 
     } 
     go func() { 
      for v := range ch { 
       fmt.Printf("receive %v: %v\n", i, v) 
      } 
      fmt.Printf("Exit %v\n", i) 
     }() 
    } 

    b.Source <- "Test 1" 
    b.Source <- "Test 2" 
    // This is a race condition: the second reader may or may not receive the first two messages. 
    b.RemoveSubscriber(toberemoved) 
    b.Source <- "Test 3" 

    // let the reader goroutines receive the last message 
    time.Sleep(2 * time.Second) 

    close(b.Source) 

    // let the reader goroutines write close message 
    time.Sleep(1 * time.Second) 
} 

https://play.golang.org/p/X-NcikvbDM

編集:私はSourceを閉じた後RemoveSubscriberを呼び出すときにパニックを修正するために、あなたの編集を追加しましたが、あなたはするべきでありませんそれをやっているのではなく、構造体とその中のすべてがチャンネルが閉じられた後にガベージコレクションされるようにするべきです。 Sourceを閉じた後に呼び出された場合は、NewSubscriberにパニックを追加しました。以前はこれを行うことができました。作成されたチャンネルとおそらくそのチャンネルで永遠にブロックされるゴルーチンが漏れてしまいます。

既に閉鎖されている放送事業者にNewSubscriber(またはRemoveSubscriber)と電話をかけることができます。これは、放送局を抱えてはいけないので、コードにエラーがある可能性があります。

関連する問題