2016-09-12 10 views
2

をカプセル化して、ContentProviderを照会し、ContentProviderカーソルをサブスクライブして継続的な更新を提供するロジックをカプセル化したいとします。RxJavaに必要なルーパースレッドを管理する方法

私は観察可能なので、私はそれをSchedulers.io()で購読する必要があります。その問題は、ContentObserverを登録することができないということです。なぜなら、ルーパーの準備スレッドが必要だからです。

これを管理するにはどうすればよいでしょうか。を1つのObservableでカプセル化しています。

あることを示すために、コード:

public Observable<Integer> unreadCountObservable() { 
    return Observable.create(subscriber -> { 
     new UnreadCountObservable(subscriber); 
    }); 
} 

private class UnreadCountObservable { 
    private Subscriber subscriber; 

    public UnreadCountObservable(Subscriber subscriber) { 
     this.subscriber = subscriber; 
     Cursor cursor = queryUnread(subscriber); 
     cursor.registerContentObserver(observer); 
     subscriber.add(Subscriptions.create(() -> { 
      cursor.unregisterContentObserver(observer); 
      cursor.close(); 
     })); 
    } 

    @NonNull 
    private Cursor queryUnread(Subscriber subscriber) { 
     Cursor cursor = contextProvider.getContext().getContentResolver().query(Uri.parse(CONTENT_URI),SMS_PROJECTION,SMS_SELECTION_UNREAD,SMS_PROJECTION,null); 
     if(cursor.moveToNext()) { 
      Integer count = cursor.getInt(0); 
      subscriber.onNext(count); 
     } else { 
      subscriber.onNext(0); 
     } 
     return cursor; 
    } 

    private ContentObserver observer = new ContentObserver(new Handler()) { 
     @Override 
     public boolean deliverSelfNotifications() { 
      return false; 
     } 

     @Override 
     public void onChange(boolean selfChange) { 
      Timber.d("New sms data changed"); 
      queryUnread(subscriber); 
     } 
    }; 
} 

注1上記のコードの問題は、それが原因registerObserverに.subscribeOn(Schedulers.io()で呼び出すことができない、それはそれは、その後のクエリをmainThreadと呼ばれていた場合ということです単一Observable中のすべてをカプセル化重要な要件と、この質問

の動機である。また、

注)その上で実行

ここで、Observableを使用するアクティビティのHandlerThreadを作成し、そのスレッドからルーパーを使用することをお勧めします。しかし、より良い選択肢があるかどうか、また一般的なスケジューラ(例えばlooperIoScheduler())を作成することが問題を引き起こす可能性があるかどうかを知りたい

答えて

-1

だから、どうしてこれは機能しませんか?

public Observable<Integer> unreadCountObservable() { 
    return Observable.defer(() -> Observable.create(subscriber -> { 
     new UnreadCountObservable(subscriber); 
    })) 
    .subscribeOn(Schedulers.io()); 
} 
+1

これは機能しません。 SubscribeOnはサブスクリプションチェーンに入りますが、チェーンの最初の文か最後の文かは関係ありません。そうすれば、ioスレッドの実行を延期し、UnreadCountObservableコンストラクターを実行することもできます。 – lujop

1

Observableチェーンでは、必要に応じてスレッドを頻繁に変更できます。見てくださいhere

機能rx.Observable#observeOn(rx.Scheduler)は、チェーン内の任意の場所に配置できます。このようなことを試みてください(擬似コード):

Observable.just(cursor) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .map((Cursor) -> { 
       cursor.registerContentObserver(observer); 
       return cursor; 
      } 
     }).observeOn(Schedulers.io()); 
subscriber.add(Subscriptions.create(() -> { 
     cursor.unregisterContentObserver(observer); 
     cursor.close(); 
    })); 
+0

ありがとうございますが、サンプルを展開して、カーソルの作成とオブザーバからの更新の送信も含めることができます。なぜなら、完全なコードは読みにくいと思うからです。 – lujop

関連する問題