2017-12-03 17 views
0

私は、rxjs WebsocketSubjectがjsonrpcメッセージを送信する角度5のアプリケーションを持っています。rxjsが最初にストリーム・チェーン全体を完成させます

これは私のsendRequest機能

sendRequest(request: Request): Promise<Response> { 

    console.log(request); 

    this.socket.next(JSON.stringify(request)); 

    return this.onResponse().filter((response: Response) => { 
    return response.id === request.id; 
    }).first().toPromise().then((response) => { 

    console.log(response); 

    if (response.error) { 
     console.log('error'); 
     throw new RpcError(response.error); 
    } 

    return response; 

    }); 

} 

私はこのフィルタのサブスクリプションを完了するために、最初の()演算子を使用しています。しかしonResponse()は私のWebSocketSubjectから直接来て、これが完了します。

オリジナルの被写体をデカップリングする方法はありますか?

新しいObservale.create(...)を作成する必要がありますか?

書かれた.filter関数はどうなりますか?それはどこでも最後になるのですか、それとも永続的なフィルタコールを防ぐためにどこでも削除する必要がありますか?


編集1

また、これを使用することは助けにはなりません。

sendRequest(request: Request): Promise<Response> { 

    console.log(request); 

    this.socket.next(JSON.stringify(request)); 

    return new Promise<Response>((resolve, reject) => { 

    const responseSubscription = this.onResponse().filter((response: Response) => { 
     console.log('filter'); 
     return response.id === request.id; 
    }).subscribe((response: Response) => { 

     // responseSubscription.unsubscribe(); 
     resolve(response); 

    }); 

    }); 

} 

私がサブスクライブを実行すると、websocketSubject全体が閉じられます。そうしないと、リクエストごとにさらに時間がかかることがあります。


編集2

ここで私は私のログに

Using Angular 5.0.2 
websocket.service.ts:27 ctor 
websocket.service.ts:69 WS state Connecting 
core.js:3565 Angular is running in the development mode. Call enableProdMode() to enable the production mode. 
websocket.service.ts:96 Request {jsonrpc: "2.0", id: "b042005c-5fbf-5ffc-fbd1-df68fae5882e", method: "appointment_list_get", params: undefined} 
websocket.service.ts:69 WS state Connected 
websocket.service.ts:103 filter 
websocket.service.ts:69 WS state Disconnected 

import {Injectable} from "@angular/core"; 
import {WebSocketSubject, WebSocketSubjectConfig} from "rxjs/observable/dom/WebSocketSubject"; 
import {MessageFactory, Notification, Request, Response, RpcError} from "../misc/jsonrpc"; 
import {ReplaySubject} from "rxjs/ReplaySubject"; 
import {Observable} from "rxjs/Observable"; 
import 'rxjs/add/operator/toPromise'; 
import 'rxjs/add/operator/filter'; 
import 'rxjs/add/operator/first'; 
import 'rxjs/add/observable/from'; 

export enum ConnectionState { 
    CONNECTED = "Connected", 
    CONNECTING = "Connecting", 
    CLOSING = "Closing", 
    DISCONNECTED = "Disconnected" 
} 

@Injectable() 
export class WebsocketService { 

    private connectionState = new ReplaySubject<ConnectionState>(1); 
    private socket: WebSocketSubject<ArrayBuffer | Object>; 
    private config: WebSocketSubjectConfig; 

    constructor() { 

    console.log('ctor'); 

    const protocol = location.protocol === 'https' ? 'wss' : 'ws'; 
    const host = location.hostname; 
    const port = 3000; // location.port; 

    this.config = { 
     binaryType: "arraybuffer", 
     url: `${protocol}://${host}:${port}`, 
     openObserver: { 
     next:() => this.connectionState.next(ConnectionState.CONNECTED) 
     }, 
     closingObserver: { 
     next:() => this.connectionState.next(ConnectionState.CLOSING) 
     }, 
     closeObserver: { 
     next:() => this.connectionState.next(ConnectionState.DISCONNECTED) 
     }, 
     resultSelector: (e: MessageEvent) => { 

     try { 

      if (e.data instanceof ArrayBuffer) { 
      return e.data; 
      } else { 
      return JSON.parse(e.data); 
      } 

     } catch (e) { 

      console.error(e); 
      return null; 

     } 

     } 
    }; 

    this.connectionState.next(ConnectionState.CONNECTING); 
    this.socket = new WebSocketSubject(this.config); 

    this.connectionState.subscribe((state) => { 
     console.log(`WS state ${state}`); 
    }); 

    } 

    onBinaryData(): Observable<ArrayBuffer> { 
    return this.socket.filter((message: any) => { 
     return message instanceof ArrayBuffer; 
    }); 
    } 

    onMessageData(): Observable<Object> { 
    return this.socket.filter((message: any) => { 
     return !(message instanceof ArrayBuffer); 
    }); 
    } 

    onResponse(): Observable<Response> { 
    return this.onMessageData().filter((message) => { 
     return MessageFactory.from(message).isResponse(); 
    }).map((message): Response => { 
     return MessageFactory.from(message).toResponse(); 
    }); 
    } 

    sendRequest(request: Request): Promise<Response> { 

    console.log(request); 

    this.socket.next(JSON.stringify(request)); 

    return new Promise<Response>((resolve, reject) => { 

     const responseSubscription = this.onResponse().filter((response: Response) => { 
     console.log('filter'); 
     return response.id === request.id; 
     }).subscribe((response: Response) => { 

     responseSubscription.unsubscribe(); 
     resolve(response); 

     }); 

    }); 

    } 

    sendNotification(notification: Notification): void { 
    this.socket.next(JSON.stringify(notification)); 
    } 

} 

そして結果を書かれている全websocketServiceである私が見つける必要があります方法のデカプリンg元のストリームからのフィルタを何とかしてください。

+0

「first()。toPromise()。then'を使用する必要があるのはなぜですか? 'this.onResponse()。filter(...)。subscribe(...);'のように購読する方が簡単でしょうか? – martin

+0

はい、私の理解では(多分私は間違っています)私はそのサブスクリプションをクリーンアップする必要があり、私は単一の結果を期待しています。だからプロミスを使っても問題ありません。しかし.toPromiseはObservalbeで完全に解決されます – Pascal

+0

今、あなたが今やっていることの問題は何ですか? – martin

答えて

1

これは機能しています。 重要なのは、メッセージ処理を下層のwebSocketSubjectから切り離すことでした。

import {Injectable} from "@angular/core"; 
import {WebSocketSubject, WebSocketSubjectConfig} from "rxjs/observable/dom/WebSocketSubject"; 
import {MessageFactory, Notification, Request, Response, RpcError} from "../misc/jsonrpc"; 
import {ReplaySubject} from "rxjs/ReplaySubject"; 
import {Observable} from "rxjs/Observable"; 
import 'rxjs/add/operator/toPromise'; 
import 'rxjs/add/operator/filter'; 
import 'rxjs/add/operator/first'; 
import 'rxjs/add/observable/from'; 
import {Subject} from "rxjs/Subject"; 

export enum ConnectionState { 
    CONNECTED = "Connected", 
    CONNECTING = "Connecting", 
    CLOSING = "Closing", 
    DISCONNECTED = "Disconnected" 
} 

@Injectable() 
export class WebsocketService { 

    private connectionState = new ReplaySubject<ConnectionState>(1); 
    private socket: WebSocketSubject<ArrayBuffer | Object>; 
    private config: WebSocketSubjectConfig; 

    private messageObserver = new Subject<MessageFactory>(); 
    private binaryObserver = new Subject<ArrayBuffer>(); 

    constructor() { 

    const protocol = location.protocol === 'https' ? 'wss' : 'ws'; 
    const host = location.hostname; 
    const port = 3000; // location.port; 

    this.config = { 
     binaryType: "arraybuffer", 
     url: `${protocol}://${host}:${port}`, 
     openObserver: { 
     next:() => this.connectionState.next(ConnectionState.CONNECTED) 
     }, 
     closingObserver: { 
     next:() => this.connectionState.next(ConnectionState.CLOSING) 
     }, 
     closeObserver: { 
     next:() => this.connectionState.next(ConnectionState.DISCONNECTED) 
     }, 
     resultSelector: (e: MessageEvent) => { 

     try { 

      if (e.data instanceof ArrayBuffer) { 
      return e.data; 
      } else { 
      return JSON.parse(e.data); 
      } 

     } catch (e) { 

      console.error(e); 
      return null; 

     } 

     } 
    }; 

    this.connectionState.next(ConnectionState.CONNECTING); 
    this.socket = new WebSocketSubject(this.config); 

    this.socket.filter((message: any) => { 
     return message instanceof ArrayBuffer; 
    }).subscribe((message: ArrayBuffer) => { 
     this.binaryObserver.next(message); 
    }); 

    this.socket.filter((message: any) => { 
     return !(message instanceof ArrayBuffer); 
    }).subscribe((message: ArrayBuffer) => { 
     this.messageObserver.next(MessageFactory.from(message)); 
    }); 

    this.connectionState.subscribe((state) => { 
     console.log(`WS state ${state}`); 
    }); 

    } 

    onResponse(): Observable<Response> { 
    return this.messageObserver.filter((message: MessageFactory) => { 
     return message.isResponse(); 
    }).map((message: MessageFactory): Response => { 
     return message.toResponse(); 
    }); 
    } 

    sendRequest(request: Request): Promise<Response> { 

    console.log(request); 

    this.socket.next(JSON.stringify(request)); 

    return this.onResponse().filter((response: Response) => { 
     return request.id === response.id; 
    }).first().toPromise().then((response) => { 

     console.log(response); 

     if (response.error) { 
     console.log('error'); 
     throw new RpcError(response.error); 
     } 

     return response; 

    }); 

    } 

    sendNotification(notification: Notification): void { 
    this.socket.next(JSON.stringify(notification)); 
    } 

} 
関連する問題