私はソケットプロトコル実装を単純化するためにReactive Extensionsをasync/awaitと組み合わせて使用しています。特定のメッセージが到着したときに実行されなければならないアクションがいくつかあります(たとえば、各pingメッセージに 'pong'を送信するなど)。また、特定の応答を非同期的に待たなければならない方法もあります。次の例では、このことを示していますasync/await、RXとLINQを使用した非同期メッセージ処理
private Subject<string> MessageReceived = new Subject<string>();
//this method gets called every time a message is received from socket
internal void OnReceiveMessage(string message)
{
MessageReceived.OnNext(message);
ProcessMessage(message);
}
public async Task<string> TestMethod()
{
var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync();
await SendMessage("ABC");
//some code...
//if response we are waiting for comes before next row, we miss it
return await expectedMessage;
}
のtestMethod()(その前にいくつかの他のメッセージがあるかもしれません)ソケットに「ABC」を送信し、例えば「DEF」を受信したとき続きます。
これはほとんど動作しますが、競合状態があります。このコードではメッセージが受信されないようです。return await expectedMessage;
これは問題です。メッセージがその前に到着することがあるためです。
あなたは 'expectedMessage'を待っているときにポンが送信されないことを意味しますか?その場合、 'OnReceiveMessage'の中に' MessageReceived.OnNext() 'を格納せずに' ping'メッセージを処理する必要があります。 –
実際には、その部分を除いています。それは正常に動作しています。 ProcessMessages()メソッドでは、pingメッセージに応答してpongを送信しますが、問題は非同期メソッドで特定のイベント(ソケットから受信したメッセージ)を待っています – Juha