1

リアクティブストリームに関しては、パブリッシャが存在し、多数のサブスクライバを持つことができます。購読者は発行者として行動できますか?

ただし、購読者がPublisherからメッセージを受け取ったとします。このサブスクライバ(Subs1)はメッセージを変更/変更し、変更されたメッセージを消費する他のサブスクライバ(Subs2など)に渡します。

このSubs1サブスクライバは、新しいSubs2サブスクライバにメッセージを渡すことができるパブリッシャとして機能できますか?

可能かどうかは分かりませんが、シナリオは可能でしょうと思います。

可能であれば、これを行う方法を提案してください。

答えて

1

受信メッセージを変換して次のサブスクライバに渡す場合は、プロセッサインターフェイスを実装する必要があります。これは、メッセージを受信するためのサブスクライバと、メッセージを処理してそれ以降の処理のために送信するため、パブリッシャとして機能します。

ここでは、これを行うための完全な実装です:

  1. それは、サブスクライバとパブリッシャの両方として機能するよう、プロセッサを実装し、SubmissionPublisherを拡張MyTransformerクラスを作成します。

    import java.util.concurrent.Flow; 
    import java.util.concurrent.Flow.Subscription; 
    import java.util.concurrent.SubmissionPublisher; 
    import java.util.function.Function; 
    
    public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> { 
    
    private Function<T, R> function; 
    private Flow.Subscription subscription; 
    
    public MyTransformer(Function<T, R> function) { 
        super(); 
        this.function = function; 
    } 
    
    @Override 
    public void onComplete() { 
        System.out.println("Transformer Completed"); 
    } 
    
    @Override 
    public void onError(Throwable e) { 
        e.printStackTrace(); 
    } 
    
    @Override 
    public void onNext(T item) { 
        System.out.println("Transformer Got : "+item); 
        submit(function.apply(item)); 
        subscription.request(1); 
    
    } 
    
    @Override 
    public void onSubscribe(Subscription subscription) { 
        this.subscription = subscription; 
        subscription.request(1); 
    } 
    
    
    
    } 
    
  2. Subscriberインターフェイスを実装し、必要なメソッドを実装するTestSubscriberクラスを作成します。

処理が開始される前にonSubscribe()メソッドが呼び出されます。サブスクリプションのインスタンスが引数として渡されます。これは、サブスクライバとパブリッシャ間のメッセージの流れを制御するために使用されるクラスです。

ここでの主なメソッドはonNext()です。これは、パブリッシャが新しいメッセージをパブリッシュするたびに呼び出されます。

私たちは、Publisherインターフェイスを実装するSubmissionPublisherクラスを使用しています。

TestSubscriberが受け取るN個の要素をパブリッシャーに送信します。

TestSubscriberのインスタンスでclose()メソッドを呼び出していることに注意してください。指定されたパブリッシャのすべてのサブスクライバでonComplete()コールバックを呼び出します。

import java.util.LinkedList; 
import java.util.List; 
import java.util.concurrent.Flow.Subscriber; 
import java.util.concurrent.Flow.Subscription; 

public class TestSubscriber<T> implements Subscriber<T> { 

    private Subscription subscription; 

    public List<T> consumed = new LinkedList<>(); 

    @Override 
    public void onComplete() { 
     System.out.println("Subsciber Completed"); 
    } 

    @Override 
    public void onError(Throwable arg0) { 
     arg0.printStackTrace(); 
    } 

    @Override 
    public void onNext(T item) { 
     System.out.println("In Subscriber Got : "+item); 
     subscription.request(1); 

    } 

    @Override 
    public void onSubscribe(Subscription subscription) { 
     this.subscription = subscription; 
     subscription.request(1); 
    } 

} 
ここ
  • パブリッシャが文字列要素を公開された流れを処理しています。
  • MyTransformerは文字列を整数として解析しています。つまり、ここで変換が必要です。

    import java.util.List; 
    import java.util.concurrent.SubmissionPublisher;; 
    
    public class TestTransformer { 
    
        public static void main(String... args) { 
         SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
         MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt); 
    
         TestSubscriber<Integer> subscriber = new TestSubscriber<>(); 
         List<String> items = List.of("1", "2", "3"); 
    
         List<Integer> expectedResult = List.of(1, 2, 3); 
    
         publisher.subscribe(transformProcessor); 
         transformProcessor.subscribe(subscriber); 
         items.forEach(publisher::submit); 
         publisher.close(); 
    
        } 
    } 
    
    関連する問題