私はいくつかの外部リソースへの4回の呼び出しのようないくつかの基本的なプロジェクトを持っています。私が達成したいのは、その呼び出しをHystrixObservableCommand
にラップし、それを非同期的に呼び出すことです。Netflix Hystrix - HystrixObservableCommand非同期実行
HystrixObservableCommand
オブジェクトで.observe()
を呼び出した後、ラップされたロジックをすぐに非同期に呼び出す必要があります。しかし、それは同期して動作するため、私は間違ったことをしています。
例のコードでは、私は出力(今のところ)に興味がないので、出力はVoid
です。それで、Observableをconstructor.observe()
と呼ばれるオブジェクトに割り当てなかったのもその理由です。
@Component
public class LoggerProducer {
private static final Logger LOGGER = Logger.getLogger(LoggerProducer.class);
@Autowired
SimpMessagingTemplate template;
private void push(Iterable<Message> messages, String topic) throws Exception {
template.convertAndSend("/messages/"+topic, messages);
}
public void splitAndPush(Iterable<Message> messages) {
Map<MessageTypeEnum, List<Message>> groupByMessageType = StreamSupport.stream(messages.spliterator(), true)
.collect(Collectors.groupingBy(Message::getType));
//should be async - it's not
new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.INFO),
MessageTypeEnum.INFO.toString().toLowerCase()).observe();
new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.WARN),
MessageTypeEnum.WARN.toString().toLowerCase()).observe();
new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.ERROR),
MessageTypeEnum.ERROR.toString().toLowerCase()).observe();
}
class CommandPushToBrowser extends HystrixObservableCommand<Void> {
private Iterable<Message> messages;
private String messageTypeName;
public CommandPushToBrowser(Iterable<Message> messages, String messageTypeName) {
super(HystrixCommandGroupKey.Factory.asKey("Messages"));
this.messageTypeName = messageTypeName;
this.messages = messages;
}
@Override
protected Observable<Void> construct() {
return Observable.create(new Observable.OnSubscribe<Void>() {
@Override
public void call(Subscriber<? super Void> observer) {
try {
for (int i = 0 ; i < 50 ; i ++) {
LOGGER.info("Count: " + i + " messageType " + messageTypeName);
}
if (null != messages) {
push(messages, messageTypeName);
LOGGER.info("Message type: " + messageTypeName + " pushed: " + messages);
}
if (!observer.isUnsubscribed()) {
observer.onCompleted();
}
} catch (Exception e) {
e.printStackTrace();
observer.onError(e);
}
}
});
}
}
}
私は問題を把握しようとしていたとして、いくつかの純粋な「テスト」のコードの断片は、そこにありますが、単に論理を無視し、主な焦点は、それが.observe()
で非同期に実行することです。私は標準HystrixCommand
でそれを達成できることを知っていますが、これは目標ではありません。
ホープ誰かが助け:) よろしく、