2012-03-11 12 views
1

Rxを使用してソケットに接続し、データを受信して​​います。私が持っている問題は、ソケットに接続した後、データを受け取る前に承認のためにデータを送信する必要があるということです。TCPソケットに接続する方法、データを受信する前にデータを送信する

接続

public static IObservable<Unit> WhenConnected(this Socket socket, IPAddress address, int port) 
{ 
    return Observable.FromAsyncPattern<IPAddress, int>(
     socket.BeginConnect, 
     socket.EndConnect)(address, port); 
} 

データ受信

は、すべてのコードを投稿しませんが、要約すると、それを繰り返し非同期メソッドを受信開始/終了からバイトを受け取るためにTakeWhileを使用しています。

var receiveData = Observable.FromAsyncPattern 
    <byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive); 

質問1:

今私の問題は、私は私のサブスクリプションを構築行くのですかですか?

var query = from _ in socket.WhenConnected(IPAddress.Parse(_host), _port) 
      //need to authorize before receiving data 
      from value socket.DataReceived().Repeat() 
      select value; 

using (query.Subscribe(... 

質問2

通常私は、私は完全に異なる何かをする必要がありますNetworkStreamではなく、ソケットを使用して情報のパケットを送信するのでしょうか?

答えて

0

Rxx libraryには、この種の操作に非常に役立つ拡張メソッドがいくつかあります。バージョン1.3のStreamExtensions.cs内部、

public static IObservable<byte[]> ReadObservable(this Stream stream, int count) 

およびネットワークストリーム上で使用することができる

public static IObservable<Unit> WriteObservable(this Stream stream, byte[] buffer, int offset, int count) 

あります。私は以下のが最善の方法であるかどうかわからないんだけど、以下のように設定することができます

NetworkStream networkStream = null; 
var setupStreamPlan = Observable 
    .When(WhenConnected(...)) 
    .Then(_ => networkStream = new NetworkStream(socket)); 
var authorizePlan = Observable 
    .When(setupStreamPlan) 
    .Then(_ => networkStream.WriteObserable(/*message to write*/); 
var listenPlan = Observable 
    .When(authorizePlan) 
    .Then(_ => networkStream.ReadObservable(10).Repeat()); 
var constantSizeMessageStream = Observable 
    .When(listenPlan) 
    .Switch(); 

EDIT:networkStreamは

をセットアップするために可
関連する問題