2016-08-24 17 views
3

linqを使用して、websocket接続を介して受信したイベントを処理します。私を切り株Reactive Extensionsで着信websocketメッセージを監視していますか?

private static void Main() 
    { 
     string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting"; 
     using (WebSocket ws = new WebSocket(WsEndpoint)) 
     { 
      ws.OnMessage += Ws_OnMessage; 

      ws.Connect(); 
      Console.ReadKey(); 
      ws.Close(); 
     } 
    } 

    private static void Ws_OnMessage(object sender, MessageEventArgs e) 
    { 
     Console.WriteLine(e.Data); 
    } 

最初のシンクタンクは、イベント・ストリームのいくつかの並べ替えにws.OnMessageを有効にする方法である:これは私がこれまで持っているものです。私は外部イベントソースを反応的な拡張子で観測するためのオンラインサンプルを見つけることができません。私はjsonオブジェクトにメッセージを解析し、それをフィルタリングして集約しようと考えています。

誰かがwebsocketメッセージからobservableを作成し、それにサブスクライブする例を提供できますか?


編集:選択した解答から最終的な作業コード

唯一の違いは、私はあなたがこのようなあなたに観察を設定する必要がありますObservable.Using

//------------------------------------------------------- 
// Create websocket connection 
//------------------------------------------------------- 
const string wsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting"; 
WebSocket socket = new WebSocket(wsEndpoint); 


//------------------------------------------------------- 
// Create an observable by wrapping ws.OnMessage 
//------------------------------------------------------- 
var globalEventStream = Observable 
    .Using(
     () => socket, 
     ws => 
      Observable 
       .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
        handler => ws.OnMessage += handler, 
        handler => ws.OnMessage -= handler)); 
//--------------------------------------------------------- 
// Subscribe to globalEventStream 
//--------------------------------------------------------- 

IDisposable subscription = globalEventStream.Subscribe(ep => 
{ 
    Console.WriteLine("Event Recieved"); 
    Console.WriteLine(ep.EventArgs.Data); 
}); 

//---------------------------------------------------------- 
// Send message over websocket 
//---------------------------------------------------------- 
socket.Connect(); 
socket.Send("test message"); 
// When finished, close the connection. 
socket.Close(); 
+0

あなたは何 'WebSocket'ライブラリを使用している:

ここで作業コードですか? – Jacob

+0

私は 'WebSocketSharp'を使用しています – mooglinux

+0

@mooglinux - なぜ' .Publish() 'コールがありますか?それは、あなたがobservableで '.Connect()'を呼び出すまで、すべての値を防ぎます。おそらく '.Publish()'だけを削除することができます。 – Enigmativity

答えて

6

にそれを渡す前のWebSocketを初期化していることです。

var observable = 
     Observable 
      .Using(
       () => new WebSocket(WsEndpoint), 
       ws => 
        Observable 
         .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
          handler => ws.OnMessage += handler, 
          handler => ws.OnMessage -= handler)); 

これにより、正しくsocが作成されます。ケートを観察し、観察可能なものが購読されているときにそのイベントを観察する。サブスクリプションが廃棄されると、イベントから正しく切り離され、ソケットが処分されます。


observableのタイプはIObservable<EventPattern<MessageEventArgs>>になります。あなたは、このように、この観測可能な消費:投稿NuGet参照の

IDisposable subscription = observable.Subscribe(ep => 
{ 
    Console.WriteLine(ep.EventArgs.Data); 
}); 

感謝を。

const string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting"; 

Console.WriteLine("Defining Observable:"); 

IObservable<EventPattern<WebSocketSharp.MessageEventArgs>> observable = 
    Observable 
     .Using(
      () => 
      { 
       var ws = new WebSocketSharp.WebSocket(WsEndpoint); 
       ws.Connect(); 
       return ws; 
      }, 
      ws => 
       Observable 
        .FromEventPattern<EventHandler<WebSocketSharp.MessageEventArgs>, WebSocketSharp.MessageEventArgs>(
         handler => ws.OnMessage += handler, 
         handler => ws.OnMessage -= handler)); 

Console.WriteLine("Subscribing to Observable:"); 

IDisposable subscription = observable.Subscribe(ep => 
{ 
    Console.WriteLine("Event Recieved"); 
    Console.WriteLine(ep.EventArgs.Data); 
}); 

Console.WriteLine("Writing to Source:"); 

using (var source = new WebSocketSharp.WebSocket(WsEndpoint)) 
{ 
    source.Connect(); 
    source.Send("test"); 
} 
+0

websocketを 'Observable.Using'の中に置く利点は、他の答えとは異なりますか? – mooglinux

+3

observableのサブスクリプションを破棄すると、ソケットも破棄されます。 – Shlomo

+0

'handler'のカスタム関数を書いていますか?もしそうなら、戻り値の型は何ですか? 'handler'をデータを出力し、' void'を返す関数に変更しようとしましたが、うまくいかなかった。 – mooglinux

関連する問題