2016-09-02 6 views
0

私はいくつかの外部リソースへの4回の呼び出しのようないくつかの基本的なプロジェクトを持っています。私が達成したいのは、その呼び出しをHystrixObservableCommandにラップし、それを非同期的に呼び出すことです。Netflix Hystrix - HystrixObservableCommand非同期実行

HystrixObservableCommandオブジェクトで.observe()を呼び出した後、ラップされたロジックをすぐに非同期に呼び出す必要があります。しかし、それは同期して動作するため、私は間違ったことをしています。

例のコードでは、私は出力(今のところ)に興味がないので、出力はVoidです。それで、Observableをconstructor.observe()と呼ばれるオブジェクトに割り当てなかったのもその理由です。

@Component 
public class LoggerProducer { 

    private static final Logger LOGGER = Logger.getLogger(LoggerProducer.class); 

    @Autowired 
    SimpMessagingTemplate template; 

    private void push(Iterable<Message> messages, String topic) throws Exception { 
     template.convertAndSend("/messages/"+topic, messages); 
    } 

    public void splitAndPush(Iterable<Message> messages) { 

     Map<MessageTypeEnum, List<Message>> groupByMessageType = StreamSupport.stream(messages.spliterator(), true) 
       .collect(Collectors.groupingBy(Message::getType)); 

     //should be async - it's not 
     new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.INFO), 
       MessageTypeEnum.INFO.toString().toLowerCase()).observe(); 
     new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.WARN), 
       MessageTypeEnum.WARN.toString().toLowerCase()).observe(); 
     new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.ERROR), 
       MessageTypeEnum.ERROR.toString().toLowerCase()).observe(); 

    } 

    class CommandPushToBrowser extends HystrixObservableCommand<Void> { 

     private Iterable<Message> messages; 
     private String messageTypeName; 

     public CommandPushToBrowser(Iterable<Message> messages, String messageTypeName) { 
      super(HystrixCommandGroupKey.Factory.asKey("Messages")); 
      this.messageTypeName = messageTypeName; 
      this.messages = messages; 
     } 

     @Override 
     protected Observable<Void> construct() { 
      return Observable.create(new Observable.OnSubscribe<Void>() { 

       @Override 
       public void call(Subscriber<? super Void> observer) { 
        try { 
         for (int i = 0 ; i < 50 ; i ++) { 
          LOGGER.info("Count: " + i + " messageType " + messageTypeName); 
         } 
         if (null != messages) { 
          push(messages, messageTypeName); 
          LOGGER.info("Message type: " + messageTypeName + " pushed: " + messages); 
         } 
         if (!observer.isUnsubscribed()) { 
          observer.onCompleted(); 
         } 
        } catch (Exception e) { 
         e.printStackTrace(); 
         observer.onError(e); 
        } 
       } 
      }); 
     } 
    } 
} 

私は問題を把握しようとしていたとして、いくつかの純粋な「テスト」のコードの断片は、そこにありますが、単に論理を無視し、主な焦点は、それが.observe()で非同期に実行することです。私は標準HystrixCommandでそれを達成できることを知っていますが、これは目標ではありません。

ホープ誰かが助け:) よろしく、

答えて

2

回答が見つかった:

は「観測は、自動的に並行処理を追加しないでください、あなたが観察して 同期、ブロックの実行をモデル化している場合は、それらがします。 が実行されます

subscribeOn(Schedulers.io())を使用してスレッドでスケジューリングすることで、非同期に簡単にすることができます。観察可能で ブロッキング呼び出しをラップするために:あなたは、ブロッキング呼び出しをラップしている場合には、それがために構築されているものだと、別のすべてを実行すること デフォルトとして https://speakerdeck.com/benjchristensen/applying-reactive-programming-with-rxjava-at-goto-chicago-2015?slide=33

しかし、あなただけのHystrixCommandを使用して を守らなければなりません糸。 を使用するHystrixCommand.observe()は、あなたが探している並行非同期 コンポジションを提供します。

HystrixObservableCommandは、非同期を包み込むための余分なスレッドを必要としない 非ブロック観測を目的としている「

- ベン・クリステンセン - Netflixのエッジエンジニアリング

出典:。https://groups.google.com/forum/#!topic/hystrixoss/g7ZLIudE8Rs