2012-06-18 8 views
14

私がしたいのは、いくつかのプロデューサー・ゴルーチン(そのうちのいくつかは完了していてもいなくてもよい)とコンシューマー・ルーチンです。問題はそのかっこ内の警告です - 回答を返す総数はわかりません。Goのプロデューサー/コンシューマーの最も慣れ親しんだイディオムは何ですか?

それでは、私がやりたいことはこれです:

package main 

import (
    "fmt" 
    "math/rand" 
) 

func producer(c chan int) { 
    // May or may not produce. 
    success := rand.Float32() > 0.5 
    if success { 
    c <- rand.Int() 
    } 
} 

func main() { 
    c := make(chan int, 10) 
    for i := 0; i < 10; i++ { 
    go producer(c, signal) 
    } 

    // If we include a close, then that's WRONG. Chan will be closed 
    // but a producer will try to write to it. Runtime error. 
    close(c) 

    // If we don't close, then that's WRONG. All goroutines will 
    // deadlock, since the range keyword will look for a close. 
    for num := range c { 
    fmt.Printf("Producer produced: %d\n", num) 
    } 
    fmt.Println("All done.") 
} 

だから、問題は、私はそれが間違って閉じた場合、私は近くにいない場合、である - それはまだ間違っている(コード内のコメントを参照してください)。

package main 

import (
    "fmt" 
    "math/rand" 
) 

func producer(c chan int, signal chan bool) { 
    success := rand.Float32() > 0.5 
    if success { 
    c <- rand.Int() 
    } 
    signal <- true 
} 

func main() { 
    c := make(chan int, 10) 
    signal := make(chan bool, 10) 
    for i := 0; i < 10; i++ { 
    go producer(c, signal) 
    } 

    // This is basically a 'join'. 
    num_done := 0 
    for num_done < 10 { 
    <- signal 
    num_done++ 
    } 
    close(c) 

    for num := range c { 
    fmt.Printf("Producer produced: %d\n", num) 
    } 
    fmt.Println("All done.") 
} 

そして、それは完全に私が欲しいものを行います。

今、解決策は、すべてのプロデューサがに書き込むことを、アウトオブバンド信号チャネルになります!しかし私には一口のようだ。私の質問は、簡単なやり方で似たようなことをさせるイディオム/トリックはありますか?

私はここを見ていた:http://golang.org/doc/codewalk/sharemem/ をそして、それは(mainの開始時に初期化)completeちゃんのように思えるの範囲で使用されるが閉じられることはありません。私はどのように理解できません。

誰かが洞察力を持っているなら、私はそれを高く評価します。乾杯!


編集:fls0815には答えがあり、また、クローズレスチャンネル範囲の仕組みについての質問にも答えました。

(fls0815親切に供給された符号の前に行われる)仕事をするmodifed上記の私のコードは:

package main 

import (
    "fmt" 
    "math/rand" 
    "sync" 
) 

var wg_prod sync.WaitGroup 
var wg_cons sync.WaitGroup 

func producer(c chan int) { 
    success := rand.Float32() > 0.5 
    if success { 
    c <- rand.Int() 
    } 
    wg_prod.Done() 
} 

func main() { 
    c := make(chan int, 10) 
    wg_prod.Add(10) 
    for i := 0; i < 10; i++ { 
    go producer(c) 
    } 

    wg_cons.Add(1) 
    go func() { 
    for num := range c { 
     fmt.Printf("Producer produced: %d\n", num) 
    } 
    wg_cons.Done() 
    }() 

    wg_prod.Wait() 
    close(c) 
    wg_cons.Wait() 
    fmt.Println("All done.") 
} 

答えて

14

のみ生産者がチャンネルを閉じる必要があります。プロデューサーが開始されると、結果のチャンネルを反復するコンシューマー(range)を呼び出して、目標を達成することができます。メインスレッドでは、消費者/生産者が作業を終了するまで待つ(sync.WaitGroup参照)。プロデューサーの作業が終了したら、チャンネルを閉じて消費者を強制的に終了させます(rangeは、チャンネルが閉じられ、バッファされたアイテムが残っていないと終了します)。

コード例:我々は、複数のプロデューサーを持っているので

package main 

import (
    "log" 
    "sync" 
    "time" 
    "math/rand" 
    "runtime" 
) 

func consumer() { 
    defer consumer_wg.Done() 

    for item := range resultingChannel { 
     log.Println("Consumed:", item) 
    } 
} 

func producer() { 
    defer producer_wg.Done() 

    success := rand.Float32() > 0.5 
    if success { 
     resultingChannel <- rand.Int() 
    } 
} 

var resultingChannel = make(chan int) 
var producer_wg sync.WaitGroup 
var consumer_wg sync.WaitGroup 

func main() { 
    rand.Seed(time.Now().Unix()) 

    for c := 0; c < runtime.NumCPU(); c++ { 
     producer_wg.Add(1) 
     go producer() 
    } 

    for c := 0; c < runtime.NumCPU(); c++ { 
     consumer_wg.Add(1) 
     go consumer() 
    } 

    producer_wg.Wait() 

    close(resultingChannel) 

    consumer_wg.Wait() 
} 

私は主な機能にclose -statementを置く理由があります。上の例の1つのプロデューサでチャンネルを閉じると、すでに実行していた問題が発生します(クローズドチャンネルでの書き込み、理由は1人のプロデューサがデータを生成してしまう可能性があるためです)。プロデューサが残っていない場合にのみ、チャンネルを閉じる必要があります(したがって、プロデューサのみがチャンネルを閉じるという提案)。これは、Goでチャネルを構築する方法です。 Hereあなたは閉鎖チャンネルに関するいくつかのより多くの情報を見つけるでしょう。 sharemem例に関連


:この例では、何度も何度もリソースを再キューイングによって無限に実行AFAICS(から保留 - >完全に - >保留 - >完全な...など)。これがmain-funcの最後の繰り返しです。完了したリソースを受信し、Resource.Sleep()を使用してそれらを再キューして保留状態にします。完了したリソースがない場合、待機し、新しいリソースが完了するまでブロックします。したがって、チャネルが常に使用されているため、チャネルを閉じる必要はありません。

+0

こんにちは、あなたの答えに感謝 - それは私が探していたもの確かです。 「プロデューサーだけがチャネルを閉鎖する」というあなたの提案を拡大することができますか? - それは常識/コード意識のルールのように聞こえるが、技術的な理由があるかどうか疑問に思っていた(あなたがリストしたコードサンプルにはチャンネルを閉じる主な機能があるので)。再度、感謝します! – Will

+1

さらに詳しい情報を追加しました。 HTH。 – fls0815

+0

ああ、そうですね。おそらく、各プロデューサーがチャンネルを閉じることが許可されているかどうかをチェックしなければならないという厳しいルールかもしれないと考えました。私たちの例ではmain()の中でそれを閉じるのではなく、はるかに厄介な(より多くの不必要な検査を伴う)ことは明らかですが、私は事をする方法であることを心配していました。私はまだスタイルのベストプラクティスを感じようとしています。 – Will

0

これらの問題を解決する方法は常にたくさんあります。ここでは、Goで基本的な単純な同期チャネルを使用したソリューションを紹介します。バッファリングされたチャネル、終了チャネル、WaitGroupsなし。

それは本当にないそこまであなたの「一口」溶液からだ、と - 失望して申し訳ありません - ないというはるかに小さいです。消費者はそれを自分のゴルーチンに入れているので、消費者はプロデューサーがそれらを生産するときに数字を消費することができます。また、制作の「試行」が成功か失敗かで終わることを区別します。プロダクションが失敗した場合は、すぐに試行されます。成功した場合、試行はその数値が消費されるまで行われません。

package main 

import (
    "fmt" 
    "math/rand" 
) 

func producer(c chan int, fail chan bool) { 
    if success := rand.Float32() > 0.5; success { 
     c <- rand.Int() 
    } else { 
     fail <- true 
    } 
} 

func consumer(c chan int, success chan bool) { 
    for { 
     num := <-c 
     fmt.Printf("Producer produced: %d\n", num) 
     success <- true 
    } 
} 

func main() { 
    const nTries = 10 
    c := make(chan int) 
    done := make(chan bool) 
    for i := 0; i < nTries; i++ { 
     go producer(c, done) 
    } 
    go consumer(c, done) 

    for i := 0; i < nTries; i++ { 
     <-done 
    } 
    fmt.Println("All done.") 
} 
0

現存する回答によっていくつかのことが明らかにならないため、これを追加します。最初に、コードウォークの例の範囲ループは無限のイベントループに過ぎず、同じURLリストを再確認して更新し続けることができます。

次に、チャンネルはすべて単独で、です.Goの慣用的な消費者生産者キューです。チャネルをバックアップする非同期バッファのサイズは、バックプレッシャを取得する前にプロデューサがどれだけ生産できるかを決定します。先に競争する人や後ろに出る者がいなくても、ロックステップのプロデューサーコンシューマーを見るには、N = 0に設定します。そのままでは、N = 10は、ブロックする前にプロデューサーに最大10個の製品を生産させることになります。

最終、移動中に(あなたのためのルーチンを行く開始などの機能を、と通信し、制御コマンドを受け入れることについて/選択パターンを使用して)通信シーケンシャルprocesseesを書くためのいくつかの素晴らしいイディオムがあります。私はWaitGroupsを不器用なものと思っており、代わりに慣用的な例を見たいと思っています。

package main 

import (
    "fmt" 
    "time" 
) 

type control int 
const (
    sleep control = iota 
    die // receiver will close the control chan in response to die, to ack. 
) 

func (cmd control) String() string { 
    switch cmd { 
    case sleep: return "sleep" 
    case die: return "die" 
    } 
    return fmt.Sprintf("%d",cmd) 
} 

func ProduceTo(writechan chan<- int, ctrl chan control, done chan bool) { 
    var product int 
    go func() { 
     for { 
      select { 
     case writechan <- product: 
      fmt.Printf("Producer produced %v\n", product) 
      product++ 
     case cmd:= <- ctrl: 
      fmt.Printf("Producer got control cmd: %v\n", cmd) 
      switch cmd { 
      case sleep: 
       fmt.Printf("Producer sleeping 2 sec.\n") 
       time.Sleep(2000 * time.Millisecond) 
      case die: 
       fmt.Printf("Producer dies.\n") 
       close(done) 
       return 
      } 
      } 
     } 
    }() 
} 

func ConsumeFrom(readchan <-chan int, ctrl chan control, done chan bool) { 
    go func() { 
     var product int 
     for { 
      select { 
      case product = <-readchan: 
       fmt.Printf("Consumer consumed %v\n", product) 
      case cmd:= <- ctrl: 
       fmt.Printf("Consumer got control cmd: %v\n", cmd) 
       switch cmd { 
       case sleep: 
        fmt.Printf("Consumer sleeping 2 sec.\n") 
        time.Sleep(2000 * time.Millisecond) 
       case die: 
        fmt.Printf("Consumer dies.\n") 
        close(done) 
        return 
       } 

      } 
     } 
    }() 
} 

func main() { 

    N := 10 
    q := make(chan int, N) 

    prodCtrl := make(chan control) 
    consCtrl := make(chan control) 

    prodDone := make(chan bool) 
    consDone := make(chan bool) 


    ProduceTo(q, prodCtrl, prodDone) 
    ConsumeFrom(q, consCtrl, consDone) 

    // wait for a moment, to let them produce and consume 
    timer := time.NewTimer(10 * time.Millisecond) 
    <-timer.C 

    // tell producer to pause 
    fmt.Printf("telling producer to pause\n") 
    prodCtrl <- sleep 

    // wait for a second 
    timer = time.NewTimer(1 * time.Second) 
    <-timer.C 

    // tell consumer to pause 
    fmt.Printf("telling consumer to pause\n") 
    consCtrl <- sleep 


    // tell them both to finish 
    prodCtrl <- die 
    consCtrl <- die 

    // wait for that to actually happen 
    <-prodDone 
    <-consDone 
} 
0

fanIn機能でジェネレータパターンを使用すると、待機グループのない単純なバッファなしチャネルを使用できます。

ジェネレータパターンでは、各プロデューサがチャネルを返し、それを閉じる役割を果たします。 fanIn関数は、これらのチャネルを反復処理し、返された値を返す単一のチャネルに転送します。

コースの問題は、各チャネルが閉じられたときにファンイン機能は、チャネルタイプ(INT)のゼロ値を転送することです。

あなたはセンチネル値としてごチャネル型のゼロ値を使用して、彼らはゼロの値でない場合にのみファンインチャネルからの結果を用いて、それを回避することができます。

は、ここに例を示しますが

package main 

import (
    "fmt" 
    "math/rand" 
) 

const offset = 1 

func producer() chan int { 
    cout := make(chan int) 
    go func() { 
     defer close(cout) 
     // May or may not produce. 
     success := rand.Float32() > 0.5 
     if success { 
      cout <- rand.Int() + offset 
     } 
    }() 
    return cout 
} 

func fanIn(cin []chan int) chan int { 
    cout := make(chan int) 
    go func() { 
     defer close(cout) 
     for _, c := range cin { 
      cout <- <-c 
     } 
    }() 
    return cout 
} 

func main() { 
    chans := make([]chan int, 0) 
    for i := 0; i < 10; i++ { 
     chans = append(chans, producer()) 
    } 

    for num := range fanIn(chans) { 
     if num > offset { 
      fmt.Printf("Producer produced: %d\n", num) 
     } 
    } 
    fmt.Println("All done.") 
} 
関連する問題