2016-04-03 13 views
1

こんにちは私はRXJavaの初心者です。 私は、どのアイテムが受け取られたかに応じてアイテムを放出し続ける観察可能な解決策を探しています。RXJava PausableBuffer

私たちの状態はこのInteger述語であると言うことができます:myNumberSubject.onNext(someInt);奇数が追加される前に追加されたすべての数字がバッファにストックされているが、もの:我々は次のように件名に番号を追加すると

Func1<Integer, Boolean> isOdd = number -> number%2==1; 

最初の奇数が追加されると、バッファ内のすべての数値が1つのバースト(奇数の項目を含む)で放出されます。

その後、すべての数字が奇数である限り、1つずつ発行されます。もう一度偶数が追加されると、それはバッファに置かれます。

実際の作業例は見つかりましたが、pausableBufferのこの大理石の例は、私がやろうとしていることを正確に行う可能性があります。 http://rxmarbles.com/#pausableBuffered このトリックを行う既存のRXJavaソリューションがあることを願っています。ここに私自身のハックの仕事の解決策があります。

public class PausableBuffer<R> { 

    private boolean isPaused; 
    private List<R> buffer; 
    private ReplaySubject<R> regulatedSubject; 

    private PausableBuffer(){ 
     regulatedSubject = ReplaySubject.create(); 
     buffer=new LinkedList<>(); 
    } 

    public static <R>Observable<R> create(Observable<R> observable, Func1<R, Boolean> continueCondition){ 
     PausableBuffer<R> pausableBuffer = new PausableBuffer<>(); 
     observable.subscribe(value -> { 
      synchronized(pausableBuffer) { 
       if(pausableBuffer.isPaused){ 
        pausableBuffer.buffer.add(value); 
        if(continueCondition.call(value)){ 
         pausableBuffer.isPaused=false; 
         for (R r : pausableBuffer.buffer) { 
          pausableBuffer.regulatedSubject.onNext(r); 
         } 
         pausableBuffer.buffer.clear(); 
        } 
       }else{ 
        if(continueCondition.call(value)){ 
         pausableBuffer.regulatedSubject.onNext(value); 
        }else{ 
         pausableBuffer.isPaused=true; 
         pausableBuffer.buffer.add(value); 
        } 
       } 
      } 
     }); 
     return pausableBuffer.regulatedSubject.asObservable(); 
    } 

    public static void main(String[] args) { 
     BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(); 
     Observable<Integer> observable = PausableBuffer.<Integer>create(
       behaviorSubject.asObservable(),      
       intValue -> intValue==5 || 6<intValue);//continueCondition 
     observable.subscribe(v -> System.out.print(v+", ")); 
     for (int i = 0; i <= 8; i++) { 
      System.out.print("adding " + i + " : "); 
      behaviorSubject.onNext(i); 
      System.out.println(); 
     } 
    } 
} 

プリントアウト:0を追加

  • 1を加算:
  • 2を添加:
  • 3を添加:
  • 4を添加する:
  • 5追加:0 、1,2,3,4,5、
  • 、 、
  • 追加6:7追加
  • :6、7、8追加
  • :8、
+1

このカスタム「BufferUntil」演算子のようなものを探している可能性があります。http://stackoverflow.com/a/35890924/4096987 – AndroidEx

答えて

0

第二版:AndroidExに

public final class ContinueWhile<T> implements Observable.Operator<T, T> { 

final Func1<T, Boolean> continuePredicate; 

private ContinueWhile(Func1<T, Boolean> continuePredicate) { 
    this.continuePredicate = continuePredicate; 
} 

public static <T>ContinueWhile<T> create(Func1<T, Boolean> whileTrue){ 
    return new ContinueWhile<>(whileTrue); 
} 

@Override 
public Subscriber<? super T> call(Subscriber<? super T> child) { 
    ContinueWhileSubscriber parent = new ContinueWhileSubscriber(child); 
    child.add(parent); 
    return parent; 
} 

final class ContinueWhileSubscriber extends Subscriber<T> { 

    final Subscriber<? super T> actual; 
    Deque<T> buffer = new ConcurrentLinkedDeque<>(); 

    public ContinueWhileSubscriber(Subscriber<? super T> actual) { 
     this.actual = actual; 
    } 

    @Override 
    public void onNext(T t) { 
     buffer.add(t); 
     if (continuePredicate.call(t)) { 
      while(!buffer.isEmpty()) 
       actual.onNext(buffer.poll()); 
     } 
    } 

    @Override 
    public void onError(Throwable e) { 
     buffer = null; 
     actual.onError(e); 
    } 

    @Override 
    public void onCompleted() { 
     while (!buffer.isEmpty()) 
      actual.onNext(buffer.poll()); 
     buffer=null; 
     actual.onCompleted(); 
    } 
} 
} 




public static void main(String[] args) { 
    BehaviorSubject<Integer> subject = BehaviorSubject.create(); 
    subject.asObservable() 
      .doOnNext(v -> System.out.print("next ")) 
      .lift(ContinueWhile.create(i -> i%3==0)) 
      .subscribe(v -> System.out.print(v + ", ")); 
    for (int i = 0; i < 10; i++) { 
     subject.onNext(i); 
    } 
} 
} 

おかげ。