1
私は、ネットワークプログラミングとイベント駆動型イベントを全く導入していません。しかし、マシン(サーバー)とクライアントマシン(テスト用)(コマンドライン)間のTCP接続を使用してpub-subスキームを正常に実装できました。しかし、実際にはTwistedでUNIXソケットを使用する必要があります。遅延の未処理エラー? TwistedでUNIXソケットを使用する
コードを実行しているとき、私は次のエラーを取得しています:ここで
Unhandled error in Deferred
はpub_sub.py
のための私のコードです:
"""
a networking implementation of PubSub using Twisted.
=============
PubSub Server
=============
A PubSub server listens for subscription requests and publish commands, and, when
published to, sends data to subscribers. All incoming and outgoing requests are
encoded in JSON.
A Subscribe request looks like this:
{
"command": "subscribe",
"topic": "hello"
}
A Publish request looks like this:
{
"command": "publish",
"topic": "hello",
"data": {
"world": "WORLD"
}
}
When the server receives a Publish request, it will send the 'data' object to all
subscribers of 'topic'.
"""
import argparse
import json
import logging
from collections import defaultdict
from twisted.internet import reactor
from twisted.python import log
from twisted.python.filepath import FilePath
from twisted.internet.endpoints import UNIXClientEndpoint, UNIXServerEndpoint, \
connectProtocol
from twisted.internet.protocol import Protocol, Factory
class PubSubProtocol(Protocol):
def __init__(self, topics):
self.topics = topics
self.subscribed_topic = None
def connectionLost(self, reason):
print("Connection lost: {}".format(reason))
if self.subscribed_topic:
self.topics[self.subscribed_topic].remove(self)
def dataReceived(self, data):
print("Data received: {}".format(data))
try:
request = json.loads(data)
except ValueError:
logging.debug("ValueError on deconding incoming data. "
"Data: {}".format(data), exc_info=True)
self.transport.loseConnection()
return
if request['command'] == 'subscribe':
self.handle_subscribe(request['topic'])
elif request['command'] == 'publish':
self.handle_publish(request['topic'], request['data'])
def handle_subscribe(self, topic):
print("Subscribed to topic: {}".format(topic))
self.topics[topic].add(self)
self.subscribed_topic = topic
def handle_publish(self, topic, data):
request = json.dumps(data)
for protocol in self.topics[topic]:
protocol.transport.write(request)
print("Publish sent for topic: {}".format(topic))
class PubSubFactory(Factory):
def __init__(self):
self.topics = defaultdict(set)
def buildProtocol(self, addr):
return PubSubProtocol(self.topics)
class PublisherProtocol(Protocol):
"""
Publish protocol for sending data to client, i.e. front-end web GUI.
"""
def __init__(self, topic, **kwargs):
self.topic = topic
self.kwargs = kwargs
def connectionMade(self):
request = json.dumps({
'command': 'publish',
'topic': self.topic,
'data': self.kwargs,
})
self.transport.write(request)
self.transport.loseConnection()
class SubscriberProtocol(Protocol):
"""
Subscriber protocol for client sending a request to subscribe to a specific
topic.
"""
def __init__(self, topic, callback):
self.topic = topic
self.callback = callback
def connectionMade(self):
request = json.dumps({
'command': 'subscribe',
'topic': self.topic,
})
self.transport.write(request)
def dataReceived(self, data):
kwargs = json.loads(data)
self.callback(**kwargs)
class PubSub(object):
def __init__(self, path='./.sock'):
self.path = FilePath(path)
self.reactor = reactor
def _make_connection(self, protocol):
endpoint = UNIXClientEndpoint(reactor, self.path)
connection = connectProtocol(endpoint, protocol)
def subscribe(self, topic, callback):
"""
Subscribe 'callback' callable to 'topic'.
"""
sub = SubscriberProtocol(topic, callback)
self._make_connection(sub)
def publish(self, topic, **kwargs):
"""
Publish 'kwargs' to 'topic', calling all callables subscribed to 'topic'
with the arguments specified in '**kwargs'.
"""
pub = PublisherProtocol(topic, **kwargs)
self._make_connection(pub)
def run(self):
"""
Convenience method to start the Twisted event loop.
"""
self.reactor.run()
def main():
path = FilePath("./.sock")
endpoint = UNIXServerEndpoint(reactor, path)
endpoint.listen(PubSubFactory())
reactor.run()
if __name__ == '__main__':
main()
すべてのヘルプは非常に高く評価されるだろう私が間違ってやっていること。
は、あなたがWindows上でソフトウェアを実行しているように見える
ブライアン
ありがとうございます!私はこの昨晩、Linuxマシン上で同じコードを実行し、すべてうまくいきました。私はあなたの助けに感謝します :) – Brian