2016-05-25 67 views
4

Unixシステム上でPython3プロセスを常に実行していて、時折しか実行されない他のプロセスの名前付きパイプを使ってランダムにデータを送信できるようにしたい。名前付きパイプにデータがない場合は、他の処理を続行したいので、ブロックを使わずにデータがあるかどうかを確認する必要があります。Python:名前付きパイプにデータがあるかどうかを調べる

非ブロックフラグを設定しない限り、ブロックを開くことなくブロックを開く方法を理解できません。フラグを設定すると、読み込み前または読み込み中にパイプに書き込むとクラッシュします。

これは私がやることができたベストです:

import os 

fifo = "pipe_test.fifo" 
done = False 
fd = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK) 
while not done: 
    try: 
     s = os.read(fd, 1024) # buffer size may need tweaking 
     print(s) 
     done = True 
    except BlockingIOError as e: 
     pass 
os.close(fd) 

パイプ内のデータがありません場合、私はb""を取得し、それが終了します。パイプにデータがある場合、例外を一度取得し、再試行してからデータを取得します。私は何かが間違っているように見え、奇妙な競争条件に遭遇するかもしれない。これを行うにはより良い方法がありますか?

+1

ブロッキングパイプを待機する2番目のスレッドを使用できますか? – Jasper

+0

@ジャスパー私はそれを行うことができました。それも理想的ではありませんが、私はそれが良いと認めます。また、低レベルの 'os'の代わりにPythonのファイルライブラリを使用することができます。これにより、改行などまで読み上げることができます。ありがとう、私は別のスレッドなしでチェックする方法がない限り、私のコードを変更するつもりです。 – sudo

答えて

2

彼らはデータグラムをサポートするので、私は、代わりにあなたがクライアントのコードを変更することができる場合名前付きパイプを使用しますが、UNIX domain socketsません:

import errno, fcntl, os, socket 

サーバー:

# bind socket 
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 
sock.bind('pipe_test.fifo') 
# set socket non-blocking 
fcntl.fcntl(sock.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) 

# get a datagram 
try: 
    datagram = sock.recv(1024) 
except (OSError, socket.error) as ex: 
    if ex.errno not in (errno.EINTR, errno.EAGAIN): 
     raise 
else: 
    print('Datagram: %r' % datagram) 

クライアント:

sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 
sock.sendto('Hello!', 'pipe_test.fifo') 

しかし、代わりにmultithreadingを調べるとよいでしょう。 fで非ブロッキングソケットを使用しています。

+0

私はデータグラムについて知らなかった。それは便利です。しかし、あなたは私と同じようにエラーを投げて捕まえているようです。それは安全ですか? @ sudo、 – sudo

+0

です。受信待ちのデータグラムがなければ、EAGAINがスローされます。シグナルが受信を中断した場合、EINTRがスローされます。後者の場合は、非ブロッキングソケットでは不可能ではありませんが、安全であることは害ではありません。可能性のあるエラーは次のとおりです。読み取りバッファが少なすぎます(ただし、データはわかっています)。また、クライアントはあまりにも遅いため、データを送信できません。ソケットのバッファは、デフォルトで200kBまたは512パケットです(http://stackoverflow.com/a/22007520/416224)。 – kay

+1

ニース、良い音。これを使用するようにコードを更新します。 – sudo

0

これは実際の答えではありませんが、誰にとっても有用な場合は、別のスレッドでこれをやっています。

class QueryThread(threading.Thread): 

    def __init__(self, args=(), kwargs=None): 
     threading.Thread.__init__(self, args=(), kwargs=None) 
     self.daemon = True 
     self.buf = [] 
     if not general.f_exists("pipe"): 
      os.mkfifo("pipe") 

    def run(self): 
     f = open("pipe") 
     while True: 
      try: 
       query = next(f).replace("\n", "") 
       if query != "": 
        self.buf.append(query) 
        print("Read in new query from pipe: {}, buf = {}".format(query, self.buf)) 
      except StopIteration: # not a pipe error, just means no data is left, so time to re-open 
       f.close() 
       f = open("pipe") 
     f.close() 

    def get_query(self): 
     if len(self.buf) == 0: return "" 
     query = self.buf[0] 
     self.buf.__delitem__(0) 
     return query 

バッファで改行を区切ったメッセージを保持します。別のスレッドからget_queryメソッドを呼び出して、最後に受信したメッセージを取得することができます。

関連する問題