は、私は(Windowsの場合)、それを行うような方法である:
import wx, wx.lib.newevent, threading
import win32event, win32pipe, win32file, pywintypes, winerror
NewMessage, EVT_NEW_MESSAGE = wx.lib.newevent.NewEvent()
class MessageNotifier(threading.Thread):
pipe_name = r"\\.\pipe\named_pipe_demo"
def __init__(self, frame):
threading.Thread.__init__(self)
self.frame = frame
def run(self):
open_mode = win32pipe.PIPE_ACCESS_DUPLEX | win32file.FILE_FLAG_OVERLAPPED
pipe_mode = win32pipe.PIPE_TYPE_MESSAGE
sa = pywintypes.SECURITY_ATTRIBUTES()
sa.SetSecurityDescriptorDacl(1, None, 0)
pipe_handle = win32pipe.CreateNamedPipe(
self.pipe_name, open_mode, pipe_mode,
win32pipe.PIPE_UNLIMITED_INSTANCES,
0, 0, 6000, sa
)
overlapped = pywintypes.OVERLAPPED()
overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
while 1:
try:
hr = win32pipe.ConnectNamedPipe(pipe_handle, overlapped)
except:
# Error connecting pipe
pipe_handle.Close()
break
if hr == winerror.ERROR_PIPE_CONNECTED:
# Client is fast, and already connected - signal event
win32event.SetEvent(overlapped.hEvent)
rc = win32event.WaitForSingleObject(
overlapped.hEvent, win32event.INFINITE
)
if rc == win32event.WAIT_OBJECT_0:
try:
hr, data = win32file.ReadFile(pipe_handle, 64)
win32file.WriteFile(pipe_handle, "ok")
win32pipe.DisconnectNamedPipe(pipe_handle)
wx.PostEvent(self.frame, NewMessage(data=data))
except win32file.error:
continue
class Messages(wx.Frame):
def __init__(self):
wx.Frame.__init__(self, None)
self.messages = wx.TextCtrl(self, style=wx.TE_MULTILINE | wx.TE_READONLY)
self.Bind(EVT_NEW_MESSAGE, self.On_Update)
def On_Update(self, event):
self.messages.Value += "\n" + event.data
app = wx.PySimpleApp()
app.TopWindow = Messages()
app.TopWindow.Show()
MessageNotifier(app.TopWindow).start()
app.MainLoop()
テストはしていくつかのデータを送信することにより:
import win32pipe
print win32pipe.CallNamedPipe(r"\\.\pipe\named_pipe_demo", "Hello", 64, 0)
(あなたも、この場合の応答を取得)
問題が発生しているコードの例を投稿できますか –