私は複数のサービスを持っていますが、その中にはHystrixのHystrixObservableCommandを使って他のサービスを呼び出すサービスや、HystrixCommandを使用するサービスがあります。呼び出し元のサービスからtraceIdをHystrixObservableCommandのオブザーバブルに渡すにはどうしたらいいですか?また、フォールバックが呼び出された場合に渡されるようにするにはどうすればよいですか?Hystrix ObservablesでtraceIdsを渡すには?
すべてのサービスはgrpc-javaを使用しています。私が持っている
サンプルコード:
WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
String messageFromWorldService = "";
String idFromWorldService = "";
try {
Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get();
messageFromWorldService = greeterReply.getMessage();
idFromWorldService = greeterReply.getId();
logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService);
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) {
logger.warn("Exception when calling WorldService\n" + e);
}
WorldCommand.java
public class WorldCommand extends HystrixObservableCommand<Greeter.GreeterReply> {
private static final Logger logger = LoggerFactory.getLogger(WorldCommand.class.getName());
private final Greeter.GreeterRequest greeterRequest;
private final WorldServiceGrpc.WorldServiceStub worldServiceStub;
public WorldCommand(Greeter.GreeterRequest greeterRequest, WorldServiceGrpc.WorldServiceStub worldServiceStub) {
super(HystrixCommandGroupKey.Factory.asKey("WorldService"));
this.greeterRequest = greeterRequest;
this.worldServiceStub = worldServiceStub;
}
@Override
protected Observable<Greeter.GreeterReply> construct() {
Context context = Context.current();
return Observable.create(new Observable.OnSubscribe<Greeter.GreeterReply>() {
@Override
public void call(Subscriber<? super Greeter.GreeterReply> observer) {
logger.info("In WorldCommand");
if (!observer.isUnsubscribed()) {
//pass on the context, if you want only certain headers to pass on then create a new Context and attach it.
context.attach();
logger.info("In WorldCommand after attach");
worldServiceStub.greetWithHelloOrWorld(greeterRequest, new StreamObserver<Greeter.GreeterReply>() {
@Override
public void onNext(Greeter.GreeterReply greeterReply) {
logger.info("Response from WorldService -- {}, id = {}", greeterReply.getMessage(), greeterReply.getId());
observer.onNext(greeterReply);
observer.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.info("Exception from WorldService -- {}", t);
}
@Override
public void onCompleted() {
}
});
}
}
}).subscribeOn(Schedulers.io());
}
@Override
protected Observable<Greeter.GreeterReply> resumeWithFallback() {
logger.info("Response from fallback");
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("teammate").setId("-1").build();
return Observable.just(greeterReply);
}
私がログにTRACEIDとspanIdを印刷するには、トレースとMDCCurrentTraceContext grpc Zipkinを使用しています。
WorldCommandのログエントリは両方とも、トレースおよびスパンIDを出力しません.RxIoSchedulerスレッドで呼び出されます。
EDITマイクにより示唆されるように
追加ConcurrencyStrategy。
public class CustomHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private static final Logger log = LoggerFactory.getLogger(CustomHystrixConcurrencyStrategy.class);
public <T> Callable<T> wrapCallable(Callable<T> callable){
log.info("In CustomHystrixConcurrencyStrategy: callable="+ callable.toString());
return new ContextCallable<>(callable);
}
}
HelloServiceはワールドとチームの2つのサービスを呼び出します。 WorldCommandはHystrixObservableCommandであり、TeamCommandはHystrixCommandです。
logger.info("In the HelloService:greetWithHelloWorld");
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build();
//Call WorldService
ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client");
//Async stub instead of blockingStub
WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel);
WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
String messageFromWorldService = "";
String idFromWorldService = "";
try {
Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get();
messageFromWorldService = greeterReply.getMessage();
idFromWorldService = greeterReply.getId();
logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService);
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) {
logger.warn("Exception when calling WorldService\n" + e);
}
//Call TeamService
ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client");
TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel);
TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub);
String messageFromTeamService = "";
String idFromTeamService = "";
try {
Greeter.GreeterReply greeterReply = teamCommand.construct().toBlocking().toFuture().get();
messageFromTeamService = greeterReply.getMessage();
idFromTeamService = greeterReply.getId();
logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService);
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) {
logger.warn("Exception when calling TeamService\n" + e);
}
assert(idFromWorldService.equals(idFromTeamService));
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build();
responseObserver.onNext(greeterReply);
responseObserver.onCompleted();
PreservableContextクラス
public class PreservableContexts {
//private final TraceContext traceContext;
private static final Logger logger = LoggerFactory.getLogger(PreservableContexts.class.getName());
public PreservableContexts() {
logger.info("Creating new PreservableContexts");
//this.traceContext = TraceContextHolder.getContext();
}
public void set() {
// if (traceContext != null) {
//TraceContextHolder.setContext(traceContext);
// }
}
public void clear() {
//TraceContextHolder.clearContext();
}
ログPreservableContexts中とCustomHystrixConcurrencyStrategyが印刷されない飽きません。私はHelloServerを起動するときにstartegyを登録しています。
ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client");
//Async stub instead of blockingStub
WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel);
WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub);
//Call TeamService
ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client");
TeamServiceGrpc.TeamServiceStub teamServiceStub = TeamServiceGrpc.newStub(teamChannel);
//TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel);
TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub);
try {
rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation());
rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation());
Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
@Override
public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) {
String messageFromWorldService = worldReply.getMessage();
String idFromWorldService = worldReply.getId();
logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService);
String messageFromTeamService = teamReply.getMessage();
String idFromTeamService = teamReply.getId();
logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService);
assert(idFromWorldService.equals(idFromTeamService));
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build();
logger.info("Final response=" + greeterReply.getMessage());
responseObserver.onNext(greeterReply);
responseObserver.onCompleted();
return null;
}
});
} catch (StatusRuntimeException e) {
logger.warn("Exception when calling WorldService and/or TeamService\n" + e);
}
私は今、奇妙な問題を抱えている、TeamCommandとWorldCommandへの呼び出しは、こののように完了しません:観測が設定されている方法を更新しました
HystrixConcurrencyStrategy strategy = new CustomHystrixConcurrencyStrategy();
HystrixPlugins.getInstance().registerConcurrencyStrategy(strategy);
context = HystrixRequestContext.initializeContext();
EDIT 2
コードは実行されません。
Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() {
@Override
public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) {
String messageFromWorldService = worldReply.getMessage();
また、フォールバックがある場合、hystrix-timerスレッドにはもうMDCがありません。
カスタムConcurrentStrategyクラスを作成し、ログを追加して呼び出されていることを確認しましたが、ログは出力されません。コメントでうまく書式を設定するのは難しいので、私はその質問を編集しました。何か案は?また、HystrixConcurrenyStrategyのJavadocは、HystrixCommandでの使用であり、HystrixObservableCommandではないことを言います。 '' '例えば、HystrixCommandによって実行されるすべてのCallableはwrapCallable(Callable)を呼び出してカスタム実装がCallableを追加動作で飾るチャンスを与えます。 '' ' カスタムConcurrentStrategyは両方のために機能しますか? – user2237511
[スレッド分離戦略](https://github.com/Netflix/Hystrix/wiki/How-it-Works#threads--thread-pools)を使用している場合、[同時実行戦略](https:// github .com/Netflix/Hystrix/wiki/plugins#concurrencystrategy)を適用する必要があります。セマフォの隔離についてはわかりませんが、とにかくそれを選ぶようには見えません。 これをローカルでテストしただけで、Observableコマンドがプラグインを起動します。 私はあなたが持っている問題は、どのようにコマンドを呼び出すのかと思います。 'teamCommand.construct()'の代わりに 'teamCommand.execute()'または 'teamCommand.queue()'を試してください。 – Mike
[同期実行](https://github.com/Netflix/Hystrix/wiki/How-To-Use#synchronous-execution)と[非同期実行]に関する手順(https://github.com/Netflix/Hystrix/) wiki/How-To-Use#asynchronous-execution)は、[How To Use](https://github.com/Netflix/Hystrix/wiki/How-To-Use)ページにあります。 – Mike