2017-04-10 6 views
2

私の角張ったアプリケーションは、WebSocketを使ってバックエンドと通信します。WebSocketで観測可能なRxJ

私のテストケースでは、2つのクライアントコンポーネントがあります。 Observableタイマーは、期待どおりに2つの異なるクライアントIDを出力します。

各ngOnInit()は、クライアントのIDも表示します。

何らかの理由で、websocketService.observeClient()のサブスクリプションがメッセージごとに2回呼び出されましたが、this.client.idは常に2番目のクライアントの値を出力します。

HERESに私のクライアントコンポーネント

@Component({ 
... 
}) 
export class ClientComponent implements OnInit { 

    @Input() client: Client; 

    constructor(public websocketService: WebsocketService) { 
    Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id)); 
    } 

    ngOnInit() { 

    console.log(this.client.id); 
    this.websocketService.observeClient().subscribe(data => { 
     console.log('message', this.client.id); 
    }); 

    } 

} 

そして、私のWebSocketサービス

@Injectable() 
export class WebsocketService { 

    private observable: Observable<MessageEvent>; 
    private observer: Subject<Message>; 

    constructor() { 

    const socket = new WebSocket('ws://localhost:9091'); 

    this.observable = Observable.create(
     (observer: Observer<MessageEvent>) => { 
     socket.onmessage = observer.next.bind(observer); 
     socket.onerror = observer.error.bind(observer); 
     socket.onclose = observer.complete.bind(observer); 
     return socket.close.bind(socket); 
     } 
    ); 

    this.observer = Subject.create({ 
     next: (data: Message) => { 
     if (socket.readyState === WebSocket.OPEN) { 
      socket.send(JSON.stringify(data)); 
     } 
     } 
    }); 

    } 

    observeClient(): Observable<MessageEvent> { 
    return this.observable; 
    } 

} 

編集[OK]を私の知る限り、それはその観測事実に関係しています読んだことがあるよう

ユニキャストオブジェクトであり、そのためにSubjectを使用する必要がありますが、Subjectを作成する方法はわかりません。

+0

あなたは問題があなたの 'providers'の設定がされていませんか?あなたの説明から、各クライアントコンポーネントに 'WebsocketService'の独自のインスタンスが必要なように見えます。 – martin

+0

いいえバックエンドに接続された1つの注入されたwebsocketServiceが必要です – Pascal

答えて

1

share演算子を使用して、サブスクライバ間で共有する必要があります。

this.observable = Observable.create(
    (observer: Observer<MessageEvent>) => { 
     socket.onmessage = observer.next.bind(observer); 
     socket.onerror = observer.error.bind(observer); 
     socket.onclose = observer.complete.bind(observer); 
     return socket.close.bind(socket); 
    } 
).share(); 

また、このサービスがシングルトンであることを確認してください。

ドク:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/share.md

0

あなたはあなたのための主題を作成し、ビルトインのWebSocket機能を使用することができますrxjs 5のよう。また、エラーの後にストリームに再購読するときに再接続します。この回答を参照してください:

https://stackoverflow.com/a/44067972/552203

TLDR:

let subject = Observable.webSocket('ws://localhost:8081'); 
subject 
    .retry() 
    .subscribe(
     (msg) => console.log('message received: ' + msg), 
     (err) => console.log(err), 
    () => console.log('complete') 
    ); 
subject.next(JSON.stringify({ op: 'hello' })); 
関連する問題