2013-02-10 13 views
11

にそのコマンドの標準出力からのデータを受信します が正しく、私は次のプログラムを持っているコマンドに標準入力にデータを渡すとgolang

$ while true; do go run cat_thingy.go; echo ; done 



^C 
この結果は、仮想マシンのaptからUbuntu 12.04にgolang-goをインストールした後に表示されます(go version go1)。私は、Macbook Air(goバージョン1.01に行く)上のインストールを複製することができませんでした。それはある種の競争状態のようです。実際、私が睡眠をとった場合(1回* 2回)、自分のコードでランダムな睡眠を犠牲にしてこの問題は決して見られません。

私はコードで間違っていることがありますか、これはバグですか?バグの場合は修正されていますか?

UPDATE:可能な手がかり

私はCommand.Waitが、彼らはまだ未読のデータを持っている場合でも、猫のサブプロセスから/へ通信するためのパイプを閉じますことがわかりました。私は本当にそれを処理するための適切な方法についてはよく分かりません。 stdinへの書き込みが完了したときに通知するチャネルを作成できると思いますが、catプロセスが終了したかどうかを知る必要があり、stdoutパイプに何も書き込まれていないことを確認する必要があります。私はcmd.Process.Waitを使用してプロセスがいつ終了するかを知ることができることを知っていますが、cmd.exeを呼び出すことは安全ですか?

UPDATE:ここでは、コードの新しいカットだ

近づい。私はstdinへの書き込みとstdoutからの読書までこれが動作すると信じています。ストリームを使わずにstdoutを処理するgoroutineからio.Copyを置き換えれば、データを適切にストリームすることができると思います。

package main 

import "bytes" 
import "fmt" 
import "io" 
import "log" 
import "os/exec" 
import "runtime" 

const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB 
const numInputBlocks = 6 

func main() { 
    runtime.GOMAXPROCS(5) 
    runCatFromStdin(populateStdin(numInputBlocks)) 
} 

func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"} 
     for i := 0; i < numInputBlocks; i++ { 
      repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes() 
      fmt.Printf("%s\n", repeatedBytes) 
      io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength))) 
     } 
    } 
} 

func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    go populate_stdin_func(stdin) 
    output_done_channel := make(chan bool) 
    go func() { 
     out_bytes := new(bytes.Buffer) 
     io.Copy(out_bytes, stdout) 
     fmt.Printf("%s\n", out_bytes) 
     fmt.Println(out_bytes.Len()) 
     fmt.Println(inputBufferBlockLength*numInputBlocks) 
     output_done_channel <- true 
    }() 
    <-output_done_channel 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 

答えて

0

Go statements

文が 同じアドレス空間内で、 として独立した同時制御のスレッド、またはゴルーチンの関数やメソッド呼び出しの実行を開始し、「行きます」。

GoStmt = "go"式。

式はコールでなければなりません。関数の値とパラメーターは、呼び出し元のgoroutineで通常どおりに評価された ですが、通常の 呼び出しとは異なり、プログラムの実行は呼び出された関数が完了するまで まで待機しません。代わりに、関数は新しいゴルーチン で独立して実行を開始します。関数が終了すると、そのゴルーチンも 終了します。関数に戻り値がある場合は、関数が完了すると破棄されます( )。

無償のゴルーチンを関数呼び出しに変換します。

package main 

import (
    "bytes" 
    "io" 
    "log" 
    "os" 
    "os/exec" 
) 

func main() { 
    runCatFromStdinWorks(populateStdin("aaa\n")) 
    runCatFromStdinWorks(populateStdin("bbb\n")) 
} 

func populateStdin(str string) func(io.WriteCloser) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     io.Copy(stdin, bytes.NewBufferString(str)) 
    } 
} 

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    populate_stdin_func(stdin) 
    io.Copy(os.Stdout, stdout) 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 
+2

あなたのコードが動作するものの@peterSOが言ったというのがもうひとつの方法です。 goroutineを関数呼び出しに変更すると、一般的には機能しません。一般的なケースでは、catプロセスが通信するために使用するパイプには一定のサイズのバッファがあります。たとえば、stdinパイプには特定のバッファがあります。バッファーがいっぱいになると、パイプへの書き込みがブロックされます。 Linuxでは、バッファーサイズは64KiBだと私は信じています。 stdoutの猫パイプにも同様の問題があります。メインコードでブロックI/Oを実行すると、それらのブロッキングコールがメインコードをブロックすることになります。 –

4

ここには動作する最初のコードのバージョンがあります。 sync.WaitGroupを追加して、コマンドを閉じる前にgoルーチンを送受信し終えることに注意してください。

package main 

import (
    "bytes" 
    "io" 
    "log" 
    "os" 
    "os/exec" 
    "sync" 
    "time" 
) 

func main() { 
    runCatFromStdinWorks(populateStdin("aaa\n")) 
    runCatFromStdinWorks(populateStdin("bbb\n")) 
} 

func populateStdin(str string) func(io.WriteCloser) { 
    return func(stdin io.WriteCloser) { 
     defer stdin.Close() 
     io.Copy(stdin, bytes.NewBufferString(str)) 
    } 
} 

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) { 
    cmd := exec.Command("cat") 
    stdin, err := cmd.StdinPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    stdout, err := cmd.StdoutPipe() 
    if err != nil { 
     log.Panic(err) 
    } 
    err = cmd.Start() 
    if err != nil { 
     log.Panic(err) 
    } 
    var wg sync.WaitGroup 
    wg.Add(2) 
    go func() { 
     defer wg.Done() 
     populate_stdin_func(stdin) 
    }() 
    go func() { 
     defer wg.Done() 
     time.Sleep(5 * time.Second) 
     io.Copy(os.Stdout, stdout) 
    }() 
    wg.Wait() 
    err = cmd.Wait() 
    if err != nil { 
     log.Panic(err) 
    } 
} 

(これは私の例では、パイプのバッファがいっぱいになることはありませんので、;-)

+0

これは単に@peterSOが何を言ったかという別の方法ではありません。 catへの入力は、出力とは別のgoroutineで処理されるため、実際にパイプバッファを正しく処理します。私はまた、WaitGroupsは私が同期を行うのに使用したチャネルより少し良いと思う。私はまだ、パイプがcmd.Wait()の副作用として閉じられていることが比較的紛らわしいと感じています。それはプロセスが終了するまで起こらないので、実際には混乱します。 –

関連する問題