2017-04-06 6 views
-1

私はリアクティブプログラミングではかなり新しいですが、呼び出しとSMSのリストを収集し、単一のものにするためにrxjavaを適用しようとしています。このアイデアは、連続したアイテムが並んでいる状態でカーソルを並行して繰り返すことです。 これ以降、すべてのアイテムが1つのリストに集約されます。Flowableの2つの並行パブリッシャーの結果を収集

しかし、doOnCompleteでは、すべてのデータが収集されているわけではありません(約150個のアイテムがあり、〜25個しか追加されていません)。 私は、問題はFlowable.concatがすべてのアイテムを待つのではなく、それが唯一の提案だと考えています。

public void readLatestActivity() { 
    Cursor callsCursor = getCallsCursor(); 
    Cursor smsCursor = getSmsCursor(); 

    if (callsCursor == null || smsCursor == null) { 
     return; 
    } 

    SortedSet<MActivity> activities = new TreeSet<>((left, right) -> left.getReceiveTime() < right.getReceiveTime() ? 1 : 0); 

    Flowable.concat(getCallsPublisher(callsCursor), getSmsPublisher(smsCursor)) 
      .subscribeOn(Schedulers.io()) 
      .doOnNext(new Consumer<MActivity>() { 
       @Override 
       public void accept(@NonNull MActivity mActivity) throws Exception { 
        if (mActivity != null){ 
         activities.add(mActivity); 
        } 
       } 
      }) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .doOnComplete(new Action() { 
       @Override 
       public void run() throws Exception { 
        Timber.d("doOnComplete"); 
        callsCursor.close(); 
        smsCursor.close(); 
        mBus.post(new EActivities(activities)); //<--- here I'm getting ~25 items 
       } 
      }) 
      .subscribe(); 
} 

private Publisher<MActivity> getCallsPublisher(Cursor callsCursor) { 
    return Flowable.fromIterable(RxCursorIterable.from(callsCursor)) 
      .take(callsCursor.getCount() < LIMIT ? callsCursor.getCount() : LIMIT) 
      .subscribeOn(Schedulers.computation()) 
      .parallel() 
      .runOn(Schedulers.computation()) 
      .map((Function<Cursor, MActivity>) this::readCallFromCursor) 
      .sequential() 
      .observeOn(AndroidSchedulers.mainThread()); 

} 

private Publisher<MActivity> getSmsPublisher(Cursor smsCursor) { 
    return Flowable.fromIterable(RxCursorIterable.from(smsCursor)) 
      .take(smsCursor.getCount() < LIMIT ? smsCursor.getCount() : LIMIT) 
      .subscribeOn(Schedulers.computation()) 
      .parallel() 
      .runOn(Schedulers.computation()) 
      .map((Function<Cursor, MActivity>) this::readSmsFromCursor) 
      .sequential() 
      .observeOn(AndroidSchedulers.mainThread()); 
} 

smsを読み込み、呼び出しがうまく動作するメソッド。私は問題がどこかにあると信じています。

はまた、私は、私は、問題がTreeSetを作成するときに使用するコンパレータであると考えIterable

public class RxCursorIterable implements Iterable<Cursor> { 

private Cursor mIterableCursor; 

public RxCursorIterable(Cursor c) { 
    mIterableCursor = c; 
} 

public static RxCursorIterable from(Cursor c) { 
    return new RxCursorIterable(c); 
} 

@Override 
public Iterator<Cursor> iterator() { 
    return RxCursorIterator.from(mIterableCursor); 
} 

private static class RxCursorIterator implements Iterator<Cursor> { 

    private final Cursor mCursor; 

    public RxCursorIterator(Cursor cursor) { 
     mCursor = cursor; 
    } 

    public static Iterator<Cursor> from(Cursor cursor) { 
     return new RxCursorIterator(cursor); 
    } 

    @Override 
    public boolean hasNext() { 
     return !mCursor.isClosed() && mCursor.moveToNext(); 
    } 

    @Override 
    public Cursor next() { 
     return mCursor; 
    } 
} 
} 

答えて

1

ようcursor行為を行うクラスを見つけました。 left.getReceiveTime() > right.getReceiveTime()の場合は-1を返しません。つまり、一部のアクティビティが誤って重複しているとみなされます。

私はまた、このようなあなたのgetCallsPublisherコードを簡素化することをお勧め:

return Flowable.generate(getCallsCursor(), 
     (cursor, emitter) -> { 
      if (!cursor.isClosed() && cursor.moveToNext()) 
       emitter.onNext(readCallFromCursor(cursor)); 
      else 
       emitter.onComplete(); 
     }, Cursor::close) 
    .take(LIMIT); 

とはRxCursorIterableを取り除きます。

+0

ありがとうございます。ラムダなしでこのスニペットを書いてください。使用している関数を理解したいのですが。 – AnZ

+1

@AnZ私はこの関数を使用しています: 'generate(initialState、generator、disposeState)' http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#generate(java.util.concurrent .Callable、%20io.reactivex.functions.BiConsumer、%20io.reactivex.functions.Consumer) –

関連する問題