2016-05-22 8 views
4

RxJSでWebSocketストリームをモデル化する方法は何ですか?RxJSでWebSocketストリームをモデリングする

明らかなのは、ストリームのストリームがメッセージのストリームを放出することです。

tソケットのストリームを作成した場合、メッセージのストリームを作成しても、これらのメッセージを送信した人をどのように保存できますか?

ソケットストリームは、私の最初のステップだった:

const socket$ = Observable.create(({complete, next}) => { 

    const server = new WebSocketServer({server: someHttpServer}) 

    server.on('connection', next) 

    return() => { 
    server.close() 
    complete() 
    } 

}) 

しかし、私は、私がメッセージを持ってソケットを必要とするため、メッセージの流れは、少し難しいです。すべてのソケットからすべてのソケットメッセージをストリームに観察

const message$ = socket$.flatMap(socket => Observable.create(({complete, next}) => { 

    socket.on('message', next) 
    socket.on('close', complete) 

    return() => socket.close() 

})).share() 

これは、モデリングの私の拳素朴な試みでした。しかし、私がそれを購読すれば、私はもはやソケットを持っていないので、これは単方向性になります。

私は

socket$ -> message$ -> server-processing -> socket$ 

をしたいが、複数のユースケースは、応答、ブロードキャスト、マルチキャストおよびユニキャストのためにそこにあります。

答えて

2

私はflatMapflatMapの引数の値とflatMapの(平坦化)の戻り値を受け取り、第2の機能を、取ることが分かりました。この関数は、後のすべての演算子で使用される新しい値を返すことができます。

const socketMessage$ = socket$.flatMap(

    socket => Observable.create(({complete, next}) => { 

    socket.on('message', next) 
    socket.on('close', complete) 

    return() => socket.disconnect() 

    }), 

    (socket, message) => ({socket, message}) 

).share() 
1

あなたが探しているものがわからない場合は(still preserve who sent these messagesとは何ですか?)、オブザーバブルにWebソケットをラップするライブラリがあります。ここでのドキュメントを見て:https://github.com/Reactive-Extensions/RxJS-DOM/blob/master/doc/operators/fromwebsocket.md

+0

ありがとうございますが、これはクライアント向けのようです。私は私の質問に詳細を追加しました:) –

+0

このライブラリは廃止されました。それはRxJSに直接折り畳まれています:https://github.com/ReactiveX/rxjs/tree/master/src/internal/observable/dom – Sawtaytoes

関連する問題