2017-09-13 5 views
0

私はRxJsを使ってwebsocketから送られてくるデータストリームを聞いています。このpubsubソケットを使用するには、最初にpubsubソケットURLを返す別のソケットで認証する必要があります。Observerが認証後にWebSocketの作成を待つようにする方法

私はこれを現在、単一のサブスクライバでしか使用できません。別のものを追加する必要がありますが、両方の加入者が認証をトリガーして2番目のpubsubソケットを作成するため、私の現在のアプローチに問題があります。

明確にするには、アプリケーション全体に対して1つの認証ソケットと1つのpubsubソケットのみが必要です。しかし、私は、pubsubソケットを使用しようとする前に認証が行われるのを "待つ"必要があります。それ以外の場合は、pubsubソケットは未定義です(実行時にURLのみがわかるため)。

Websocket.service.ts

private connect(): Observable<IConnectionInfo> { 

    if (!this.authObservable) { 
     var credentials = { 
     "username": "bob", 
     "password": "slob" 
     } 

     this.authObservable = Observable.create((observer) => { 
     const socket = new WebSocket(this.authurl); 
     socket.onopen =() => { 
      console.log("Auth socket opened"); 
      socket.send(JSON.stringify(credentials)); 
     } 
     socket.onmessage = (event) => { 
      var connection_info = <IConnectionInfo>JSON.parse(event.data); 
      observer.next(connection_info); 
     } 

     return() => { 
      socket.close(); //invoked on unsubscribe 
      console.log("Auth socket closed"); 
     } 
     }) 
    } 
    return this.authObservable; 
    } 

public open_pubsub(events: string[]): Observable<IPubSubMessage> { 

    return this.connect() 
     .flatMap((connection_info: IConnectionInfo) => { 
     var url = "ws://" + this.hostName + ":" + connection_info.port + "/" + connection_info.ps; 

     var subscription = { 
      "subscribe": events 
     } 

     var authenticate_request = { 
      "authenticate": connection_info['token'] 
     } 

     if (!this.psObservable) { 
      this.psObservable = Observable.create((observer) => { 
      const socket = new WebSocket(url); 

      socket.onopen =() => { 
       console.log("PS socket opened"); 
       socket.send(JSON.stringify(authenticate_request)); 
       socket.send(JSON.stringify(subscription)); 
      } 
      socket.onmessage = (event) => { 
       var psmsg = <IPubSubMessage>JSON.parse(event.data); 
       observer.next(psmsg); 
      } 

      return() => { 
       socket.close(); //invoked on unsubscribe 
       console.log("PS socked closed"); 
      } 
      }) 
     } 
     return this.psObservable; 
     } 
    ); 
    } 

、観察者:

getSystemState(): Observable<string> { 

    return this._wsService.open_pubsub([MSG_SYSTEM_STATE_CHANGED]) 
     .map((response: IPubSubMessage): string => { 

       console.log(response.payload); 
       return "I wish this worked"; 
     }) 
     .catch(this.handleError); 
} 

すべてのヘルプ感謝

これが私の現在の試みです!

(削除されているような答えに基づいてEDIT、しかし私は、コードを変更し、実際には問題の。この固定部分は非常に有用であったが、それでも複数のソケットになり)

答えて

0

を見つけたもの私は欠けていた - share()演算子の魔法。

だから、両方の観測のために、私はこれを行う:

this.authObservable = Observable.create((observer) => { 
     ...(etc)... 
     }).share(); 

そして驚くほど、各ソケットの唯一の1が作成されます。

関連する問題