私のアプリは、私の周辺機器で実行するBLE操作のキューを効果的に持っています。各操作は、周辺機器への接続を確立することから始まり、Observable<RxBleConnection>
を返します。キューの最初の項目が接続を開始し、それ以降の操作では単にこの(共有)RxBleConnection
が受信されます。接続が確立できない、またはそれは、1つの動作中に廃棄され、再確立しようとし、残りのキューに入れられた操作各リトライ、場合PublishSubjectとのRxBleConnectionは接続を解放しない
Observable.concatDelayError(queuedOperations)
:簡体形態において
は、キューを介して実行されます接続。
接続が無効になると、キューに入れられた操作は、新しい接続を再確立するのではなく、今すぐ無効なRxBleConnection
を受け取るように動作を変更することにしました。再試行ロジックはまだ実行されていますが、これらのインスタンスでは即座に失敗します。接続に関連しない操作が失敗するその他の理由があります。
この現象を発生させるために、私はObservable<RxBleConnection>
の直後にPublishSubject
を構成します。この件名は単に元のRxBleConnection
---以下のコードを参照してください。接続がエラー状態に達した場合、サブジェクトへのその後のサブスクリプションにエラーが発生します。それ以外の場合は、共有接続が返されます。これはまさに私が望む振る舞いであり、エラーが発生したときに設計通りに動作するように見えます。しかし、私はすべてが成功した今、問題を抱えています。
変更する前に、キュー内の操作がすべて消費されると、接続は自動的に解放されました。ただし、PublishSubject
が追加されても操作は成功しますが、接続は開いたままです。デバッグステートメントを使用して、被験者のonUnsubscribe
とonTerminate
が呼び出されないことを確認しました。元のRxBleConnection
は最終的にタイムアウトし、そのonUnsubscribe
とonTerminate
が呼び出されます。
アプリが周辺機器に接続されたままになっている原因が間違っているのでしょうか。
private Observable.Transformer<RxBleConnection, RxBleConnection> createConnectionSubject() {
return rxBleConnectionObservable -> {
final PublishSubject<RxBleConnection> subject = PublishSubject.create();
rxBleConnectionObservable.subscribe(
subject::onNext,
subject::onError,
subject::onCompleted);
return subject;
};
}