2016-06-29 22 views
0

RxJavaを完全に新しくしているわけではありませんが、単純なタスクのように思えます。RxJava - 完了する前に他のObservable/Completableをトリガーした後に完了

反応性のあるAPIを公開するデータソースがあります。データを取得して返すだけで、何も送信しない場合は自動的に接続を閉じます。非同期的に異なるスケジューラで実行されている

public Observable<Object> execute(String query) { 

    Single<RxConnection> rxConnection = getRxDB().getConnection(); 

    return rxConnection.flatMapObservable(conn -> { 
     Observable<Object> rxResult = conn.query(query); 

     return rxResult.doOnCompleted(() -> { 
      conn.close(); // THIS DOES NOT WORK. I would like to close the connection and to wait without blocking. 
     }); 

    }); 

} 

conn.query()とはconn.close():

は、ここに私のコードです。 conn.close()がサブスクライバを持たないCompletableを返すため、このコードは機能しません。さらに、doOnCompletedメソッド自体を手動で購読すると、rxResult Observableは接続が閉じられるのを待たずに完了します。

私は「を実行(文字列クエリ)」メソッドは、その観測を返すことたい: - )(conn.queryによってフェッチすべての項目を放ち を呼び出す - 放出する項目がない場合は、それがトリガーconn.close() - conn.close()Completableの後でのみ完了します。

ありがとうございました。

答えて

3

観察可能なものはサブスクリプションを取得し、セットアップ中は取得しません。

編集:未完了のサブスクリプションが発生したときに接続を閉じるdoOnUnsubscribe/doOnTerminateも必要です。

+0

これはかなり簡単な方法で私の問題を解決するようです。さらに、エラーの可能性も扱います。 しかし、rxがすべてを無意味なオブジェクトをたくさん作成して観測可能にするように強制するのは恐ろしいことです。 –

0

リソースの閉鎖はObservable.usingで管理されています

Observable<T> obs = 
    Observable.using(
     resourceFactory, 
     observableFactory, 
     disposeAction) 

この作成方法は、resourceFactoryによって作成されたリソースが停止(終了またはエラー)の上に配置されたり解除されることを保証します。これは、接続、クエリおよびリリースが場合にのみ起こることを保証します

private Observable<Object> executeDeferred(String query) { 
    // your original execute() method here, including doOnComplete 
} 


public Observable<Object> execute(String query) { 
    return Observable.defer(() -> executeDeferred(query)); 
} 

:、あなたが観察が購読ときのみ、クエリを実行したい

Observable<Object> rxResult = conn.query(query) 
.concatWith(conn.close().toObservable()) 
.onErrorResumeNext(e -> 
    conn.close().toObservable().concatWith(Observable.error(e))); 
+0

これは興味深い解決策です。しかし、私はそれを働かせることはできません。実際、resourceFactoryはObservableを返すことはできませんが、私の場合ではないリソースインスタンスだけを返すことができます。 –

+0

resourceFactoryはリソースを提供し、observableFactoryはresourceFactoryによって作成されたリソースからobservableを作成します。 @akarnokdはあなたの特定の問題の詳細をカバーしていますが、リソースを適切に閉鎖するためには 'Observable.using'を採用するべきです。 –

0

ので:いかが

関連する問題