2017-03-28 19 views
0

私はソケットプロトコル実装を単純化するためにReactive Extensionsをasync/awaitと組み合わせて使用​​しています。特定のメッセージが到着したときに実行されなければならないアクションがいくつかあります(たとえば、各pingメッセージに 'pong'を送信するなど)。また、特定の応答を非同期的に待たなければならない方法もあります。次の例では、このことを示していますasync/await、RXとLINQを使用した非同期メッセージ処理

private Subject<string> MessageReceived = new Subject<string>(); 

//this method gets called every time a message is received from socket 
internal void OnReceiveMessage(string message) 
{ 
    MessageReceived.OnNext(message); 
    ProcessMessage(message); 
} 

public async Task<string> TestMethod() 
{ 
    var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync(); 
    await SendMessage("ABC"); 

    //some code... 

    //if response we are waiting for comes before next row, we miss it 
    return await expectedMessage; 
} 

のtestMethod()(その前にいくつかの他のメッセージがあるかもしれません)ソケットに「ABC」を送信し、例えば「DEF」を受信したとき続きます。

これはほとんど動作しますが、競合状態があります。このコードではメッセージが受信されないようです。return await expectedMessage;これは問題です。メッセージがその前に到着することがあるためです。

+0

あなたは 'expectedMessage'を待っているときにポンが送信されないことを意味しますか?その場合、 'OnReceiveMessage'の中に' MessageReceived.OnNext() 'を格納せずに' ping'メッセージを処理する必要があります。 –

+0

実際には、その部分を除いています。それは正常に動作しています。 ProcessMessages()メソッドでは、pingメッセージに応答してpongを送信しますが、問題は非同期メソッドで特定のイベント(ソケットから受信したメッセージ)を待っています – Juha

答えて

4

FirstOrDefaultAsyncここでは、await行までサブスクライブしないので、競合状態が指摘されています。ここでは、それを置き換えることができます方法は次のとおりです。

var expectedMessage = MessageReceived 
     .Where(x => x.EndsWith("D") && x.EndsWith("F")) 
     .Take(1) 
     .Replay(1) 
     .RefCount(); 

    using (var dummySubscription = expectedMessage.Subscribe(i => {})) 
    { 
     await SendMessage("ABC"); 

     //Some code... goes here. 

     return await expectedMessage; 
    } 

.Replay(1)は1つが存在すると仮定すると、新しいサブスクリプションは、最新のエントリを取得することを確認します。それはサブスクライバがリッスンしている場合にのみ動作します。したがって、dummySubscriptionです。

関連する問題