2017-02-08 16 views
3

ElementAdded(A)、ElementRemoved(R)、ActionStarted(S)、ActionFinished(F)のイベントを観測しています。 AddsとRemoveのいくつかは、ActionStartedとActionFinishedの間に挟まれています。私は、イベントのサブシーケンスを単一のイベントElementMoved(M)に置き換え、非サンドイッチイベントを遅滞なく飛ばすようにしたい。 ElementMovedイベントには、置き換えられるすべてのイベントを含む配列が含まれている必要があります。RxJava:要素のサブシーケンスを1つの要素に置き換えます。

---A--A--R--S-A-R-F-R-A-A-- 
    (my transformation) 
---A--A--R--------M-R-A-A-- 

ElementMovedがActionFinishedイベントが発生した瞬間に表示されるべきである: はここでの例です。何ActionFinishedイベントが最後に挟まれたイベント以来、タイムアウトTの後に解雇されていない場合

さらに、その後、すべてのオリジナルのイベントではなく、火災必要があります。

     -----T 
---A1--A2--R3--S4-A5-R6------------R7-A8-A9-- 
    (my transformation) 
---A1--A2--R3---------------S4A5R6-R7-A8-A9-- 

をタイムアウト後に解雇されActionFinishedイベントがあるかもしれませんか例のように起こることはありません。それが決して起こらなければ、何もすることはありません。それは起こり、ウィンドウが開いていないので、それを新しいストリームに単独で作成するActionFinishedイベントが発生します。たとえば、次の変換が指定されたタイムアウトにウィンドウを閉じることができない場合

     -----T 
---A1--A2--R3--S4-A5-R6------------F7-A8-A9-- 
    (my transformation) 
---A1--A2--R3---------------S4A5R6-F7-A8-A9-- 

基本的に、それはそのまま全て保留イベントをフラッシュしなければなりません。

このようなイベントのフラッシュは、対応するFイベントの前に新しいSイベントが発生した場合にも発生します。 (この新しいSイベントは、上記のロジックに従って保留されるべきです)。例えば、

---A1--A2--R3--S4-A5-R6--S7---R9-A9-A10-F11-A12-- 
    (my transformation) 
---A1--A2--R3------------S4A5R6---------M7- A12-- 

私は幸運を伴ってしばらくの間、ウィンドウのオペレーターと遊んできました。バッファー演算子はフリーフローティングイベントの遅延を導入しますが、私の場合は受け入れられません。スキャンは元のストリームと同じくらい多くのイベントを発生させますが、これは私が望むものではありません。私は確かに失われているので、どんな助けも非常に高く評価されるだろう。

編集1:ウィンドウが開いている間に新しいSイベントが表示されたら、洗い流す程度を追加しました場合

編集2:移動のイベントは、それが交換されたイベントのリストが含まれている必要があることを明確にします。

編集3:RX-Javaから変更タグは、RX-のJava2する

編集4:ActionFinishedイベントが来る場合には、タイムアウトキックの後に何が起こるのかを明確に

ありがとうございます!

+0

S A FまたはS R Fをお持ちの場合はどうなりますか?これらのパターンは許可されていますか? – akarnokd

+0

はい、SとFの間のAイベントとRイベントの任意のシーケンスが許可されます(ゼロAイベントとRイベントを含む)。 – luisobo

+0

上記の2つの編集を行いました。 – luisobo

答えて

2

私の最後の回答が「レビュー担当者」によって削除されて以来、ここでは完全なソースコードを使用した回答です。これが長いコード部分のために削除された場合、私は何をすべきか分かりません。 OPの問題が複雑にオペレータを必要とすることに注意:

package hu.akarnokd.rxjava; 

import java.util.*; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.atomic.AtomicLong; 

import io.reactivex.internal.util.BackpressureHelper; 
import org.reactivestreams.*; 

import io.reactivex.*; 
import io.reactivex.Scheduler.Worker; 
import io.reactivex.disposables.*; 
import io.reactivex.schedulers.Schedulers; 

public class Main { 

    public static void main(String[] args) { 
     Flowable<String> source = Flowable.just(
       "A", "A", "R", "S", "A", "R", "F", "R", "A", "A"); 

     source.lift(new ConditionalCompactor(
       500, TimeUnit.SECONDS, Schedulers.computation())) 
       .subscribe(System.out::println, Throwable::printStackTrace); 

    } 

    static final class ConditionalCompactor implements FlowableOperator<String, String> { 
     final Scheduler scheduler; 

     final long timeout; 

     final TimeUnit unit; 

     ConditionalCompactor(long timeout, TimeUnit unit, 
          Scheduler scheduler) { 
      this.scheduler = scheduler; 
      this.timeout = timeout; 
      this.unit = unit; 
     } 

     @Override 
     public Subscriber<? super String> apply(Subscriber<? super String> t) { 
      return new ConditionalCompactorSubscriber(
        t, timeout, unit, scheduler.createWorker()); 
     } 

     static final class ConditionalCompactorSubscriber 
       implements Subscriber<String>, Subscription { 
      final Subscriber<? super String> actual; 

      final Worker worker; 

      final long timeout; 

      final TimeUnit unit; 

      final AtomicInteger wip; 

      final SerialDisposable mas; 

      final Queue<String> queue; 

      final List<String> batch; 

      final AtomicLong requested; 

      Subscription s; 

      static final Disposable NO_TIMER; 
      static { 
       NO_TIMER = Disposables.empty(); 
       NO_TIMER.dispose(); 
      } 

      volatile boolean done; 
      Throwable error; 

      boolean compacting; 

      int lastLength; 

      ConditionalCompactorSubscriber(Subscriber<? super String> actual, 
              long timeout, TimeUnit unit, Worker worker) { 
       this.actual = actual; 
       this.worker = worker; 
       this.timeout = timeout; 
       this.unit = unit; 
       this.batch = new ArrayList<>(); 
       this.wip = new AtomicInteger(); 
       this.mas = new SerialDisposable(); 
       this.mas.set(NO_TIMER); 
       this.queue = new ConcurrentLinkedQueue<>(); 
       this.requested = new AtomicLong(); 
      } 

      @Override 
      public void onSubscribe(Subscription s) { 
       this.s = s; 
       actual.onSubscribe(this); 
      } 

      @Override 
      public void onNext(String t) { 
       queue.offer(t); 
       drain(); 
      } 

      @Override 
      public void onError(Throwable e) { 
       error = e; 
       done = true; 
       drain(); 
      } 

      @Override 
      public void onComplete() { 
       done = true; 
       drain(); 
      } 

      @Override 
      public void cancel() { 
       s.cancel(); 
       worker.dispose(); 
      } 

      @Override 
      public void request(long n) { 
       BackpressureHelper.add(requested, n); 
       s.request(n); 
       drain(); 
      } 

      void drain() { 
       if (wip.getAndIncrement() != 0) { 
        return; 
       } 
       int missed = 1; 
       for (;;) { 

        long r = requested.get(); 
        long e = 0L; 

        while (e != r) { 
         boolean d = done; 
         if (d && error != null) { 
          queue.clear(); 
          actual.onError(error); 
          worker.dispose(); 
          return; 
         } 
         String s = queue.peek(); 
         if (s == null) { 
          if (d) { 
           actual.onComplete(); 
           worker.dispose(); 
           return; 
          } 
          break; 
         } 

         if (compacting) { 
          batch.clear(); 
          batch.addAll(queue); 
          int n = batch.size(); 
          String last = batch.get(n - 1); 
          if ("S".equals(last)) { 
           if (n > 1) { 
            actual.onNext(queue.poll()); 
            mas.set(NO_TIMER); 
            lastLength = -1; 
            compacting = false; 
            e++; 
            continue; 
           } 
           // keep the last as the start of the new 
           if (lastLength <= 0) { 
            lastLength = 1; 
            mas.set(worker.schedule(() -> { 
             queue.offer("T"); 
             drain(); 
            }, timeout, unit)); 
            this.s.request(1); 
           } 
           break; 
          } else 
          if ("T".equals(last)) { 
           actual.onNext(queue.poll()); 
           compacting = false; 
           mas.set(NO_TIMER); 
           lastLength = -1; 
           e++; 
           continue; 
          } else 
          if ("F".equals(last)) { 
           actual.onNext("M"); 
           while (n-- != 0) { 
            queue.poll(); 
           } 
           compacting = false; 
           mas.set(NO_TIMER); 
           lastLength = -1; 
           e++; 
          } else { 
           if (lastLength != n) { 
            lastLength = n; 
            mas.set(worker.schedule(() -> { 
             queue.offer("T"); 
             drain(); 
            }, timeout, unit)); 
            this.s.request(1); 
           } 
           break; 
          } 
         } else { 
          if ("A".equals(s) || "F".equals(s) || "R".equals(s)) { 
           queue.poll(); 
           actual.onNext(s); 
           e++; 
          } else 
          if ("T".equals(s)) { 
           // ignore timeout markers outside the compacting mode 
           queue.poll(); 
          } else { 
           compacting = true; 
          } 
         } 
        } 

        if (e != 0L) { 
         BackpressureHelper.produced(requested, e); 
        } 

        if (e == r) { 
         if (done) { 
          if (error != null) { 
           queue.clear(); 
           actual.onError(error); 
           worker.dispose(); 
           return; 
          } 
          if (queue.isEmpty()) { 
           actual.onComplete(); 
           worker.dispose(); 
           return; 
          } 
         } 
        } 

        missed = wip.addAndGet(-missed); 
        if (missed == 0) { 
         break; 
        } 
       } 
      } 
     } 
    } 
} 

オペレータのパターンは、典型的なキュー・ドレインであるが、排出相は、異なる動作モードを必要とする特定の後続のパターンを組み合わせるためのロジックを含みます。

編集は、背圧のサポートで更新RxJava 2.

編集2に更新します。

+0

を参照してくださいこれは素晴らしいです、ありがとうございます。私は、背圧を安定させるための指針があれば疑問に思っていました。私は、オペレータがバッファリングするたびに1つ上流に要求する必要があると仮定します。問題は、タイムアウトやウィンドウの破損のためにオペレーターがバッファーをフラッシュする場合です。要求された数のアイテムだけをフラッシュする必要があります。再度、感謝します。 – luisobo

+0

私はhttps://github.com/luisobo/rxjava-conditional-compacted-operator/pull/2を参照しているケースで2つの失敗したテストを書いた – luisobo

+0

はい、バックプレッシャーは少し複雑でテストにパターンが必要です。デバッグはあなたにとってより良い立場にあります。テストケースを見て、解決策を試してみましょう。 – akarnokd

関連する問題