2016-08-21 9 views
0

は私がRxJavaと私のデータを保存する方法があります:SQLiteのトランザクションとRxJava

override fun put(note: Note): Observable<Note> { 
    validateNote(note) 
    return Observable.just(note) 
      .doOnNext { dbKeeper.startTransaction() } 
      .doOnNext { storeHashtags(note) } 
      .doOnNext { storeImages(note) } 
      .flatMap { notesDataStore.put(notesMapper.transform(note)) } 
      .map { notesMapper.transform(it) } 
      .doOnNext { dbKeeper.setTransactionSuccessful() } 
      .doOnUnsubscribe { dbKeeper.endTransaction() } 
} 

そして私はこのように、このメソッドを使用します。

 notesManager.put(note) 
      .switchMap { notesManager.getHashtags() } 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe {view.setHashtags(it) } 

そしてdoOnUnsubscribeから選択しようとしてgetHashtags()として呼び出されることはありませんdbはロックされたstartTransaction()です。デッドロック、ヘイ。 doOnUnsubscribe(...)doOnTerminate(...)と置き換えてみましょう。 Observablesubscriber.unsubscribe()によって中断される場合

override fun put(note: Note): Observable<Note> { 
    validateNote(note) 
    return Observable.just(note) 
      .doOnNext { dbKeeper.startTransaction() } 
      .doOnNext { storeHashtags(note) } 
      .doOnNext { storeImages(note) } 
      .map { notesMapper.transform(note) } 
      .flatMap { notesDataStore.put(it) } 
      .map { notesMapper.transform(it) } 
      .doOnNext { dbKeeper.setTransactionSuccessful() } 
      .doOnTerminate { dbKeeper.endTransaction() } 
} 

しかし、今のトランザクションは閉じません。

私の状況を解決するにはどうすればよいですか?

追加情報: データを書き込む/読み込むためにwritableDbインスタンスを1つ使用します。

答えて

1

これはRxには適していないと思います。私の推薦は、通常のtry-最終的に(Javaの恩赦)を使用して取引を行うために、次のようになります。

Observable<Note> put(Note note) { 
    return just(note) 
     .doOnNext(n -> { 
      validateNote(n); 
      dbKeeper.startTransaction(); 
      try { 
       storeHashtags(n); 
       storeImages(n); 
       notesDataStore.put(notesMapper.transform(n)); 
       dbKeeper.setTransactionSuccessful(); 
      } finally { 
       dbKeeper.endTransaction(); 
      } 
     }); 
} 

編集:たぶん、これがより有用である:

fun <T> withTransaction(sourceObservable: Observable<T>): Observable<T> { 
    val latch = AtomicInteger(1) 
    val maybeEndTransaction = Action0 { 
     if (latch.decrementAndGet() == 0) { 
      endTransaction() 
     } 
    } 
    return Observable.empty<T>() 
      .doOnCompleted { startTransaction() } 
      .mergeWith(sourceObservable) 
      .doOnNext { setTransactionSuccessful() } 
      .doOnTerminate(maybeEndTransaction) 
      .doOnUnsubscribe(maybeEndTransaction) 
} 

このようにそれを使用します

override fun put(note: Note): Observable<Note> { 
    return Observable.just(note) 
     .doOnNext { validateNote(note) } 
     .doOnNext { storeHashtags(note) } 
     .doOnNext { storeImages(note) } 
     .flatMap { notesDataStore.put(notesMapper.transform(note)) } 
     .map { notesMapper.transform(it) } 
     .compose { withTransaction } 
} 

これは、トランザクションが一度だけ確実に終了するようにします。トランザクションアマンジャーがそれを処理できないか、新しいスレッドを既存のトランザクションに関連付けるようにスケジューラを変更していない限り、最初の観測可能なチェーン内のスレッドを切り替えないように注意してください。

+0

ええ、私はすでにRxがこのケースには適していないことを理解しました。しかし、プロジェクトのアーキテクチャを変更するのは遅すぎる。あなたのソリューションに感謝します。たぶん、 'notesDataStore.put()'やそれに類するメソッドのために 'BlockingObservable'を使うでしょう。 – Alexandr

+1

編集を追加しました。 –

+0

面白そうです、私はそれを試してみます。ありがとう! – Alexandr

関連する問題