2016-11-17 7 views
0

私はプロセス間通信に新しいので、os.pipeos.forkの使い方をPythonで理解しようとしています。ループ内で同じパイプを再利用すると 'Broken Pipe'エラーが発生する

以下のコードでは、「Broken Pipe」という行のコメントを外すとエラーが発生します。それ以外の場合はうまくいきます。

アイデアは、子プロセスが終了するときにSIGCHLDハンドラを持ち、子のみの機能(run_child)と親のみの機能(sigchld_handler)が実行されるときにそれぞれのカウンタをインクリメントします。フォークされたプロセスは独自のバージョンのメモリを持ち、変更は親プロセスには反映されないので、子プロセスがパイプを介して親プロセスにメッセージを送信し、親プロセス更新カウンタにプロセスを許可させます。

import os 
import signal 
import time 

class A(object): 
    def __init__(self): 
     self.parent = 0 
     self.child = 0 
     self._child_pid = None 

     self.rd , self.wr = os.pipe() 
     print self.rd , self.wr 
     signal.signal(signal.SIGCHLD, self.sigchld_handler) 

    def sigchld_handler(self, a, b): 
     self.parent += 1 
     print "Main run count : (parent) ", self.parent 
     #rf = os.fdopen(self.rd, 'r') 
     #self.child = int(rf.read()) 
     #rf.close() 
     self._child_pid = None 

    def run_child(self): 
     self.child += 1 
     print "Main run count (child) : ", self.child 
     print "Running in child : " , os.getpid() 
     wr = os.fdopen(self.wr,'w') 
     text = "%s" % (self.child) 
     print "C==>", text 
     wr.write(text) 
     wr.close() 
     os._exit(os.EX_OK) 

    def run(self): 
     if self._child_pid: 
      print "Child Process", self._child_pid, " already running." 
     else: 
      self._child_pid = os.fork() 

      if not self._child_pid: 
       self.run_child() 

a = A() 
i = 0 
while True: 
    a.run() 
    time.sleep(4) 
    i += 1 
    if i > 5: 
     break 

最初の数回の反復の後に面白いエラーが発生します。誰かがなぜエラーが来るのか、これを解決するために何をすべきか説明してください。

EDIT 1:ex1ex2ex3 同様の例がいくつかあります。私は実際にそれらを学習のためにのみ使用していましたが、私の場合は、プロデューサ/コンシューマキューのように動作するループを実行するようにサンプルを拡張しています。私は、マルチプロセス/キューモジュールがPythonで利用できるようには良いアプローチではないかもしれないと理解していますが、ここで間違いを理解したいと思います。

EDIT 2(溶液):@S.kozlov's answerに基づい

、すべての通信のための新しいパイプを作成するためにコードを修正。ここに変更されたコードがあります。

import os 
import pdb 
import signal 
import time 

class A(object): 
    def __init__(self): 
     self.parent = 0 
     self.child = 0 
     self._child_pid = None 
     signal.signal(signal.SIGCHLD, self.sigchld_handler) 

    def sigchld_handler(self, a, b): 
     self.parent += 1 
     os.close(self.wr) 
     print "Main run count : (parent) ", self.parent 
     rd = os.fdopen(self.rd, 'r') 
     self.child = int(rd.read()) 
     self._child_pid = None 

    def run_child(self): 
     self.child += 1 
     print "Main run count (child) : ", self.child 
     print "Running in child : " , os.getpid() 
     os.close(self.rd) 
     wr = os.fdopen(self.wr, 'w') 
     text = "%s" % (self.child) 
     print "C==>", text 
     wr.write(text) 
     wr.close() 
     os._exit(os.EX_OK) 

    def run(self): 
     if self._child_pid: 
      print "Child Process", self._child_pid, " already running." 
     else: 
      self.rd , self.wr = os.pipe() 
      self._child_pid = os.fork() 

      if not self._child_pid: 
       self.run_child() 

a = A() 
i = 0 
while True: 
    a.run() 
    time.sleep(4) 
    i += 1 
    if i > 5: 
     break 

これで、このような出力が得られるはずです(何か)。

Main run count (child) : 1 
Running in child : 15752 
C==> 1 
Main run count : (parent) 1 
Main run count (child) : 2 
Running in child : 15753 
C==> 2 
Main run count : (parent) 2 
Main run count (child) : 3 
Running in child : 15754 
C==> 3 
Main run count : (parent) 3 
Main run count (child) : 4 
Running in child : 15755 
C==> 4 
Main run count : (parent) 4 
Main run count (child) : 5 
Running in child : 15756 
C==> 5 
Main run count : (parent) 5 
Main run count (child) : 6 
Running in child : 15757 
C==> 6 
Main run count : (parent) 6 
+0

エラーなしで私のために働いています。私は何度もそれをやりました。 – quantummind

+0

@quantummind:あなたはsigchld_handlerの "uncommenting"行の後に言います。 – ViFI

+0

破損したパイプは、読み取り終了が閉じられた後にパイプに書き込もうとすると発生します。私は読むことを試みるときに起こるとは思わない。 – Barmar

答えて

2

あなたのコードの問題点は、1パイプ数回を再利用しようとしているということであり、それは一般的には、パイプのための有効なケースではありません。 例外はあなたにちょうど言っているだけです: "ねえ、あなたは以前の実行でこのパイプを閉じました。パイプが閉じられると、それは閉じています。"

コードを変更して、各子のパイプを作成し、「親」に一端(読み込み)を格納し、子に別のパイプを渡すことができます。そうすればうまくいくはずです。

編集1.私はあなたのコードを「すべての子供のための1本のパイプ」について更新しました。それは良いコードの仕方ではありませんが、教育上の意味で役立つことを望みます。

import os 
import signal 
import time 


class A(object): 
    def __init__(self): 
     self.parent = 0 
     self.child = 0 
     self._child_pid = None 
     signal.signal(signal.SIGCHLD, self.sigchld_handler) 

    def sigchld_handler(self, a, b): 
     self.parent += 1 
     print "Main run count : (parent) ", self.parent 
     os.close(self.wr) 
     rf = os.fdopen(self.rd, 'r') 
     message = rf.read() 
     rf.close() 
     print "Code from child [", self._child_pid, "]: ", message 
     self.rd = None 
     self._child_pid = None 

    def run_child(self): 
     self.child += 1 
     print "Main run count (child) : ", self.child 
     print "Running in child : " , os.getpid() 
     os.close(self.rd) 
     wr = os.fdopen(self.wr, 'w') 
     text = "Hello from %s" % (self.child) 
     print "C==>", text 
     wr.write(text) 
     wr.close() 
     os._exit(os.EX_OK) 

    def run(self): 
     if self._child_pid: 
      print "Child Process", self._child_pid, " already running." 
     else: 
      rd, wr = os.pipe() 
      self.rd = rd 
      self.wr = wr 
      self._child_pid = os.fork() 

      if not self._child_pid: 
       self.run_child() 
a = A() 
i = 0 
while True: 
    a.run() 
    time.sleep(4) 
    i += 1 
    if i > 5: 
     break 
+0

Btw、私はパイプを開いたままにして、代わりにそれにいくつかのフラッシュ操作を行うことができます。あなたのコメントの後、私はエラーを修正するコードを修正しましたが、私はこのアプローチではあまりにも多くのファイル記述子を開いていると思います。それは良いアプローチですか? – ViFI

+1

私はあなたの意見を見ます。パイプはあまり柔軟ではないので、キューやソケットなどを使い始めるのです。パイプとUnixソケットの簡単な比較があります:http://stackoverflow.com/questions/9475442/unix-domain-socket- vs-named-pipes –

関連する問題