2017-09-30 5 views
5

もう一度RxJavaとJava 9 Flowを比較しています。私はFlowがデフォルトでは非同期であることを知り、それを同期させる方法があるのだろうかと思っていました。メインスレッドで実行フロー

Nio用ではなく、砂糖構文用に使いたい場合があり、より均質なコードが必要な場合もあります。

デフォルトでは、RxJavaは同期しており、パイプラインでobserverOnsubscribeOnを使用して非同期に実行させることができます。

Flowにメインスレッドで実行される演算子がありますか?

よろしくお願いいたします。

答えて

5

Publisherは、Flowに記載されているように、同期実行を使用して定義することができます。

単一のサブスクライバに単一の TRUEアイテムを発行(要求された場合)する非常に単純な発行者です。サブスクライバは単一のアイテム のみを受信するため、このクラスではバッファリングと順序制御は使用されません。

class OneShotPublisher implements Publisher<Boolean> { 
    private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based 
    private boolean subscribed; // true after first subscribe 
    public synchronized void subscribe(Subscriber<? super Boolean> subscriber) { 
    if (subscribed) 
     subscriber.onError(new IllegalStateException()); // only one allowed 
    else { 
     subscribed = true; 
     subscriber.onSubscribe(new OneShotSubscription(subscriber, executor)); 
    } 
    } 
    static class OneShotSubscription implements Subscription { 
    private final Subscriber<? super Boolean> subscriber; 
    private final ExecutorService executor; 
    private Future<?> future; // to allow cancellation 
    private boolean completed; 
    OneShotSubscription(Subscriber<? super Boolean> subscriber, 
         ExecutorService executor) { 
     this.subscriber = subscriber; 
     this.executor = executor; 
    } 
    public synchronized void request(long n) { 
     if (n != 0 && !completed) { 
     completed = true; 
     if (n < 0) { 
      IllegalArgumentException ex = new IllegalArgumentException(); 
      executor.execute(() -> subscriber.onError(ex)); 
     } else { 
      future = executor.submit(() -> { 
      subscriber.onNext(Boolean.TRUE); 
      subscriber.onComplete(); 
      }); 
     } 
     } 
    } 
    public synchronized void cancel() { 
     completed = true; 
     if (future != null) future.cancel(false); 
    } 
    } 
} 
+2

詳細なコードをお寄せいただきありがとうございます。 – paul

3

はそうする何のオペレータがありませんが、APIを使用すると、アイテムが公開されている方法を制御することができます。したがって、現在のスレッドから直接サブスクライバメソッドを呼び出すことができます。

class SynchronousPublisher implements Publisher<Data> { 
     public synchronized void subscribe(Subscriber<? super Data> subscriber) { 
      subscriber.onSubscribe(new SynchronousSubscription(subscriber)); 
     } 
} 
static class SynchronousSubscription implements Subscription { 
     private final Subscriber<? super Data> subscriber; 

     SynchronousSubscription(Subscriber<? super Data> subscriber) { 
      this.subscriber = subscriber; 
     } 
     public synchronized void request(long n) { 
      ... // prepare item    
      subscriber.onNext(someItem);  
     } 

     ... 
    } 
} 
+0

ありがとう、これはまた、まだ非常に冗長な良い回避策ですが、ちょっと!これはJavaです:) – paul

+1

@paul JavaはこれをRS仕様(実装なし)として提供しています。これはRxJavaの代替手段ではありません。仕様として動作するために最低限必要なもののみ。 – manouti

2

これは、メインスレッドで実行することによって何を意味するかによって異なります。

特定のスレッドで任意のFlowを強制的に実行したい場合は、非同期提供部分をオーバーライドさせるライブラリにFlowが実装されていない限り、標準的な方法はありません。 RxJavaの用語では、これらは、Schedulersユーティリティクラスによって提供されるSchedulerです。

メインスレッドでFlowを観察したい場合は、キューに項目があるまでスレッドをブロックするブロックキューコンシューマをFlow.Subscriberの上に記述する必要があります。これは複雑になる可能性がありますので、blockingSubscribeの実装をReactive4JavaFlowに紹介します。

JavaメインスレッドをExecutor/Schedulerとして使用する場合は、これはさらに複雑で、同様のブロックメカニズムとスレッドプールエグゼキュータのアイデアが必要です。 Reactive4JavaFlowには、このようなスケジューラがあります。これは、new SubmissionPublisher<>(128, blockingScheduler::schedule)経由でエグゼキュータとして使用できます。

関連する問題