私はプロセス間通信に新しいので、os.pipe
とos.fork
の使い方をPythonで理解しようとしています。ループ内で同じパイプを再利用すると 'Broken Pipe'エラーが発生する
以下のコードでは、「Broken Pipe」という行のコメントを外すとエラーが発生します。それ以外の場合はうまくいきます。
アイデアは、子プロセスが終了するときにSIGCHLDハンドラを持ち、子のみの機能(run_child)と親のみの機能(sigchld_handler)が実行されるときにそれぞれのカウンタをインクリメントします。フォークされたプロセスは独自のバージョンのメモリを持ち、変更は親プロセスには反映されないので、子プロセスがパイプを介して親プロセスにメッセージを送信し、親プロセス更新カウンタにプロセスを許可させます。
import os
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
self.rd , self.wr = os.pipe()
print self.rd , self.wr
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
print "Main run count : (parent) ", self.parent
#rf = os.fdopen(self.rd, 'r')
#self.child = int(rf.read())
#rf.close()
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
wr = os.fdopen(self.wr,'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
最初の数回の反復の後に面白いエラーが発生します。誰かがなぜエラーが来るのか、これを解決するために何をすべきか説明してください。
EDIT 1:ex1、ex2、ex3: 同様の例がいくつかあります。私は実際にそれらを学習のためにのみ使用していましたが、私の場合は、プロデューサ/コンシューマキューのように動作するループを実行するようにサンプルを拡張しています。私は、マルチプロセス/キューモジュールがPythonで利用できるようには良いアプローチではないかもしれないと理解していますが、ここで間違いを理解したいと思います。
EDIT 2(溶液):@S.kozlov's answerに基づい
、すべての通信のための新しいパイプを作成するためにコードを修正。ここに変更されたコードがあります。
import os
import pdb
import signal
import time
class A(object):
def __init__(self):
self.parent = 0
self.child = 0
self._child_pid = None
signal.signal(signal.SIGCHLD, self.sigchld_handler)
def sigchld_handler(self, a, b):
self.parent += 1
os.close(self.wr)
print "Main run count : (parent) ", self.parent
rd = os.fdopen(self.rd, 'r')
self.child = int(rd.read())
self._child_pid = None
def run_child(self):
self.child += 1
print "Main run count (child) : ", self.child
print "Running in child : " , os.getpid()
os.close(self.rd)
wr = os.fdopen(self.wr, 'w')
text = "%s" % (self.child)
print "C==>", text
wr.write(text)
wr.close()
os._exit(os.EX_OK)
def run(self):
if self._child_pid:
print "Child Process", self._child_pid, " already running."
else:
self.rd , self.wr = os.pipe()
self._child_pid = os.fork()
if not self._child_pid:
self.run_child()
a = A()
i = 0
while True:
a.run()
time.sleep(4)
i += 1
if i > 5:
break
これで、このような出力が得られるはずです(何か)。
Main run count (child) : 1
Running in child : 15752
C==> 1
Main run count : (parent) 1
Main run count (child) : 2
Running in child : 15753
C==> 2
Main run count : (parent) 2
Main run count (child) : 3
Running in child : 15754
C==> 3
Main run count : (parent) 3
Main run count (child) : 4
Running in child : 15755
C==> 4
Main run count : (parent) 4
Main run count (child) : 5
Running in child : 15756
C==> 5
Main run count : (parent) 5
Main run count (child) : 6
Running in child : 15757
C==> 6
Main run count : (parent) 6
エラーなしで私のために働いています。私は何度もそれをやりました。 – quantummind
@quantummind:あなたはsigchld_handlerの "uncommenting"行の後に言います。 – ViFI
破損したパイプは、読み取り終了が閉じられた後にパイプに書き込もうとすると発生します。私は読むことを試みるときに起こるとは思わない。 – Barmar