2017-05-18 13 views
1

私はイベントを送信するシナリオをテストしていますが、消費者がフローを続けるために処理を完了したことを監視しています。 rxJavaを使用して、消費者処理の終わりまでブロックされるメインスレッドObservable、私は観察可能な結果を​​待つようにメインスレッドをロックすることに成功しませんでした。Observableが完了するまでスレッドをロックする方法

私のプロデューサー

@Service 
public class Producer { 

    private MessageChannel output; 

    @Autowired 
    private Consumer consumer; 

    @Autowired 
    public Producer(Processor processor) { 
     this.output = processor.output(); 
    } 

    public void send(String event) { 

     System.out.println("SENDING EVENT..."); 

     output.send(MessageBuilder.withPayload(event).build()); 

     //Observable<Boolean> obs = consumer.execute(); 
     //obs.subscribe(); 

     //Blocking process 
     BlockingObservable.from(consumer.execute()).subscribe(); 

     //Continue to flow 
     System.out.println("EVENT PROCESSED..."); 

    } 
} 

マイ消費者

@Service 
public class Consumer { 

    @StreamListener(target = Processor.INPUT) 
    public void receiver(@Payload String event){ 

     System.out.println("EVENT RECEIVED, PROCESSING..."); 
     try { 
      TimeUnit.SECONDS.sleep(10); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     execute(); 

    } 

    public Observable<Boolean> execute() { 
     return Observable.<Boolean>create(emitter -> { 
      try { 
       System.out.println("EVENT STILL PROCESSING..."); 
       emitter.onNext(Boolean.TRUE); 
      } catch (Exception e) { 
       emitter.onError(new RuntimeException("ERROR")); 
      } 
      emitter.onCompleted(); 
     }); 
    } 
} 

答えて

1

代わりBlockingObservable.from(consumer.execute()).subscribe()のスレッドをブロックするBlockingObservable.toFuture(consumer.execute()).get()を使用することができます。

1

値を取得するには、演算子toBlockig(コンシューマに消費を待つ)+ singleを使用する必要があります。

@Test 
    public void observableEvolveAndReturnToStringValue() { 
     assertTrue(Observable.just(10) 
          .map(String::valueOf) 
          .toBlocking() 
          .single() 
          .equals("10")); 
    } 

あなたがここでより多くの例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/utils/ObservableToBlocking.java

を見ることができます
関連する問題