受信メッセージを変換して次のサブスクライバに渡す場合は、プロセッサインターフェイスを実装する必要があります。これは、メッセージを受信するためのサブスクライバと、メッセージを処理してそれ以降の処理のために送信するため、パブリッシャとして機能します。
ここでは、これを行うための完全な実装です:
それは、サブスクライバとパブリッシャの両方として機能するよう、プロセッサを実装し、SubmissionPublisherを拡張MyTransformerクラスを作成します。
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public MyTransformer(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onComplete() {
System.out.println("Transformer Completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(T item) {
System.out.println("Transformer Got : "+item);
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
Subscriberインターフェイスを実装し、必要なメソッドを実装するTestSubscriberクラスを作成します。
処理が開始される前にonSubscribe()メソッドが呼び出されます。サブスクリプションのインスタンスが引数として渡されます。これは、サブスクライバとパブリッシャ間のメッセージの流れを制御するために使用されるクラスです。
ここでの主なメソッドはonNext()です。これは、パブリッシャが新しいメッセージをパブリッシュするたびに呼び出されます。
私たちは、Publisherインターフェイスを実装するSubmissionPublisherクラスを使用しています。
TestSubscriberが受け取るN個の要素をパブリッシャーに送信します。
TestSubscriberのインスタンスでclose()メソッドを呼び出していることに注意してください。指定されたパブリッシャのすべてのサブスクライバでonComplete()コールバックを呼び出します。
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class TestSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumed = new LinkedList<>();
@Override
public void onComplete() {
System.out.println("Subsciber Completed");
}
@Override
public void onError(Throwable arg0) {
arg0.printStackTrace();
}
@Override
public void onNext(T item) {
System.out.println("In Subscriber Got : "+item);
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
ここ
- パブリッシャが文字列要素を公開された流れを処理しています。
MyTransformerは文字列を整数として解析しています。つまり、ここで変換が必要です。
import java.util.List;
import java.util.concurrent.SubmissionPublisher;;
public class TestTransformer {
public static void main(String... args) {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
}
}