0

それはパブリッシャーとサブスクライバーを持っている、とサブスクライバがパブリッシャに加入すると、加入者はまた、次のメッセージを実装として、私は、Java 9他のユーザにメッセージを渡すには?

の反応ストリームAPIを学んでいます:

public class TestSubscriber<T> implements Subscriber<T> { 

    @Override 
    public void onComplete() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void onError(Throwable arg0) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void onNext(T arg0) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void onSubscribe(Subscription arg0) { 
     // TODO Auto-generated method stub 

    } 

} 

私がしましたメッセージを他のサブスクライバに渡し/転送するサブスクライバのメソッドが見つかりませんでした。 提案がありますか?

+0

を、それがために何を必要とされなければなりませんか?私たちが[Subscription](https://docs.oracle.com/javase/9​​/docs/api/java/util/concurrent/Flow.Subscription.html)について話しているのであれば、Subscriberと一般的な出版社。 – nullpointer

+0

と言ってください。加入者がメッセージを変更/変更して別の加入者に渡す場合 – KayV

+1

まず、これらのhttps://stackoverflow.com/questions/47936868/can-a-subscriber-act-as-aをマージすることができます - パブリッシャーと詳細な質問に1つを保つ。 – nullpointer

答えて

1

これは、次のようにFlow.Processorを実装行うことができます。

import java.util.concurrent.Flow; 
import java.util.concurrent.Flow.Subscription; 
import java.util.concurrent.SubmissionPublisher; 
import java.util.function.Function; 

public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> { 

    private Function<T, R> function; 
    private Flow.Subscription subscription; 

    public MyTransformer(Function<T, R> function) { 
     super(); 
     this.function = function; 
    } 

    @Override 
    public void onComplete() { 
     System.out.println("Transformer Completed"); 
    } 

    @Override 
    public void onError(Throwable e) { 
     e.printStackTrace(); 
    } 

    @Override 
    public void onNext(T item) { 
     System.out.println("Transformer Got : "+item); 
     submit(function.apply(item)); 
     subscription.request(1); 

    } 

    @Override 
    public void onSubscribe(Subscription subscription) { 
     this.subscription = subscription; 
     subscription.request(1); 
    } 


} 

そして、次のように使用呼び出し:

興味深い
public class TestTransformer { 

    public static void main(String... args) { 
     SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
     MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt); 

     TestSubscriber<Integer> subscriber = new TestSubscriber<>(); 
     List<String> items = List.of("1", "2", "3"); 

     List<Integer> expectedResult = List.of(1, 2, 3); 

     publisher.subscribe(transformProcessor); 
     transformProcessor.subscribe(subscriber); 
     items.forEach(publisher::submit); 
     publisher.close(); 

    } 
} 
関連する問題