2017-11-14 10 views
0

私は複数のサービスを持っていますが、その中にはHystrixのHystrixObservableCommandを使って他のサービスを呼び出すサービスや、HystrixCommandを使用するサービスがあります。呼び出し元のサービスからtraceIdをHystrixObservableCommandのオブザーバブルに渡すにはどうしたらいいですか?また、フォールバックが呼び出された場合に渡されるようにするにはどうすればよいですか?Hystrix ObservablesでtraceIdsを渡すには?

すべてのサービスはgrpc-javaを使用しています。私が持っている

サンプルコード:

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub); 
     String messageFromWorldService = ""; 
     String idFromWorldService = ""; 
     try { 

      Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get(); 
      messageFromWorldService = greeterReply.getMessage(); 
      idFromWorldService = greeterReply.getId(); 
      logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService); 
     } catch (StatusRuntimeException | InterruptedException | ExecutionException e) { 
      logger.warn("Exception when calling WorldService\n" + e); 
     } 

WorldCommand.java

public class WorldCommand extends HystrixObservableCommand<Greeter.GreeterReply> { 

    private static final Logger logger = LoggerFactory.getLogger(WorldCommand.class.getName()); 

    private final Greeter.GreeterRequest greeterRequest; 
    private final WorldServiceGrpc.WorldServiceStub worldServiceStub; 

    public WorldCommand(Greeter.GreeterRequest greeterRequest, WorldServiceGrpc.WorldServiceStub worldServiceStub) { 
     super(HystrixCommandGroupKey.Factory.asKey("WorldService")); 
     this.greeterRequest = greeterRequest; 
     this.worldServiceStub = worldServiceStub; 
    } 

    @Override 
    protected Observable<Greeter.GreeterReply> construct() { 
     Context context = Context.current(); 
     return Observable.create(new Observable.OnSubscribe<Greeter.GreeterReply>() { 
      @Override 
      public void call(Subscriber<? super Greeter.GreeterReply> observer) { 
       logger.info("In WorldCommand"); 
       if (!observer.isUnsubscribed()) { 
        //pass on the context, if you want only certain headers to pass on then create a new Context and attach it. 
        context.attach(); 
        logger.info("In WorldCommand after attach"); 
        worldServiceStub.greetWithHelloOrWorld(greeterRequest, new StreamObserver<Greeter.GreeterReply>() { 
         @Override 
         public void onNext(Greeter.GreeterReply greeterReply) { 
          logger.info("Response from WorldService -- {}, id = {}", greeterReply.getMessage(), greeterReply.getId()); 
          observer.onNext(greeterReply); 
          observer.onCompleted(); 
         } 

         @Override 
         public void onError(Throwable t) { 
          logger.info("Exception from WorldService -- {}", t); 
         } 

         @Override 
         public void onCompleted() { 

         } 
        }); 
       } 
      } 
     }).subscribeOn(Schedulers.io()); 
    } 

    @Override 
    protected Observable<Greeter.GreeterReply> resumeWithFallback() { 
     logger.info("Response from fallback"); 
     Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("teammate").setId("-1").build(); 
     return Observable.just(greeterReply); 
    } 

私がログにTRACEIDとspanIdを印刷するには、トレースとMDCCurrentTraceContext grpc Zipkinを使用しています。

WorldCommandのログエントリは両方とも、トレースおよびスパンIDを出力しません.RxIoSchedulerスレッドで呼び出されます。

EDITマイクにより示唆されるように

追加ConcurrencyStrategy。

public class CustomHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { 

    private static final Logger log = LoggerFactory.getLogger(CustomHystrixConcurrencyStrategy.class); 

    public <T> Callable<T> wrapCallable(Callable<T> callable){ 
     log.info("In CustomHystrixConcurrencyStrategy: callable="+ callable.toString()); 
     return new ContextCallable<>(callable); 
    } 
} 

HelloServiceはワールドとチームの2つのサービスを呼び出します。 WorldCommandはHystrixObservableCommandであり、TeamCommandはHystrixCommandです。

logger.info("In the HelloService:greetWithHelloWorld"); 
Greeter.GreeterRequest greeterRequest = Greeter.GreeterRequest.newBuilder().setId(request.getId()).build(); 

//Call WorldService 
ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client"); 
//Async stub instead of blockingStub 
WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel); 

WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub); 
String messageFromWorldService = ""; 
String idFromWorldService = ""; 
try { 

    Greeter.GreeterReply greeterReply = worldCommand.construct().toBlocking().toFuture().get(); 
    messageFromWorldService = greeterReply.getMessage(); 
    idFromWorldService = greeterReply.getId(); 
    logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService); 
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) { 
    logger.warn("Exception when calling WorldService\n" + e); 
} 

//Call TeamService 
ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client"); 
TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel); 
TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub); 

String messageFromTeamService = ""; 
String idFromTeamService = ""; 
try { 
    Greeter.GreeterReply greeterReply = teamCommand.construct().toBlocking().toFuture().get(); 
    messageFromTeamService = greeterReply.getMessage(); 
    idFromTeamService = greeterReply.getId(); 
    logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService); 
} catch (StatusRuntimeException | InterruptedException | ExecutionException e) { 
    logger.warn("Exception when calling TeamService\n" + e); 
} 

assert(idFromWorldService.equals(idFromTeamService)); 
Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build(); 
responseObserver.onNext(greeterReply); 
responseObserver.onCompleted(); 

PreservableContextクラス

public class PreservableContexts { 

    //private final TraceContext traceContext; 
    private static final Logger logger = LoggerFactory.getLogger(PreservableContexts.class.getName()); 

    public PreservableContexts() { 
     logger.info("Creating new PreservableContexts"); 
     //this.traceContext = TraceContextHolder.getContext(); 
    } 

    public void set() { 
     // if (traceContext != null) { 
      //TraceContextHolder.setContext(traceContext); 
     // } 
    } 

    public void clear() { 
     //TraceContextHolder.clearContext(); 
    } 

ログPreservableContexts中とCustomHystrixConcurrencyStrategyが印刷されない飽きません。私はHelloServerを起動するときにstartegyを登録しています。

ManagedChannel worldChannel = getChannel("localhost:8081", "helloService-world-client"); 
    //Async stub instead of blockingStub 
    WorldServiceGrpc.WorldServiceStub worldServiceStub = WorldServiceGrpc.newStub(worldChannel); 
    WorldCommand worldCommand = new WorldCommand(greeterRequest, worldServiceStub); 

    //Call TeamService 
    ManagedChannel teamChannel = getChannel("localhost:8082", "helloService-team-client"); 
    TeamServiceGrpc.TeamServiceStub teamServiceStub = TeamServiceGrpc.newStub(teamChannel); 
    //TeamServiceGrpc.TeamServiceBlockingStub teamServiceStub = TeamServiceGrpc.newBlockingStub(teamChannel); 
    TeamCommand teamCommand = new TeamCommand(greeterRequest, teamServiceStub); 

    try { 
     rx.Observable<Greeter.GreeterReply> worldReplyObservable = worldCommand.observe().subscribeOn(Schedulers.computation()); 
     rx.Observable<Greeter.GreeterReply> teamReplyObservable = teamCommand.observe().subscribeOn(Schedulers.computation()); 
     Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() { 
      @Override 
      public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) { 
       String messageFromWorldService = worldReply.getMessage(); 
       String idFromWorldService = worldReply.getId(); 
       logger.info("Response from WorldService -- {}, id = {}", messageFromWorldService, idFromWorldService); 

       String messageFromTeamService = teamReply.getMessage(); 
       String idFromTeamService = teamReply.getId(); 
       logger.info("Response from TeamService -- {}, id = {}", messageFromTeamService, idFromTeamService); 

       assert(idFromWorldService.equals(idFromTeamService)); 
       Greeter.GreeterReply greeterReply = Greeter.GreeterReply.newBuilder().setMessage("Hello" + messageFromWorldService + " from " + messageFromTeamService).setId(idFromWorldService).build(); 
       logger.info("Final response=" + greeterReply.getMessage()); 
       responseObserver.onNext(greeterReply); 
       responseObserver.onCompleted(); 
       return null; 
      } 
     }); 
    } catch (StatusRuntimeException e) { 
     logger.warn("Exception when calling WorldService and/or TeamService\n" + e); 
    } 

私は今、奇妙な問題を抱えている、TeamCommandとWorldCommandへの呼び出しは、こののように完了しません:観測が設定されている方法を更新しました

HystrixConcurrencyStrategy strategy = new CustomHystrixConcurrencyStrategy(); 
     HystrixPlugins.getInstance().registerConcurrencyStrategy(strategy); 
     context = HystrixRequestContext.initializeContext(); 

EDIT 2

コードは実行されません。

Observable.zip(worldReplyObservable, teamReplyObservable, new Func2<Greeter.GreeterReply, Greeter.GreeterReply, Object>() { 
       @Override 
       public Object call(Greeter.GreeterReply worldReply, Greeter.GreeterReply teamReply) { 
        String messageFromWorldService = worldReply.getMessage(); 

また、フォールバックがある場合、hystrix-timerスレッドにはもうMDCがありません。

答えて

0

私はhysterixに関する知識があまりありませんが、トレースIDのようなコンテキスト情報を渡そうとすると、io.grpc.Contextが正しいクラスになります。 traceIDを使用して新しいコンテキストを作成するには、context.withValueに電話する必要があります。データが必要な場所では、コンテキストを添付する必要があります。また、完了時にコンテキストを切り離すようにしてください。私はあなたのスニペットで起こっていることはわかりません。独自のCallableを使用するカスタムHystrixConcurrencyStrategyを登録する

0

あなたが使用する必要があります...

HystrixPlugins.getInstance().registerConcurrencyStrategy(...)

... ...

public class ConcurrencyStrategy extends HystrixConcurrencyStrategy {  
    @Override 
    public <K> Callable<K> wrapCallable(Callable<K> c) { 
     return new ContextCallable<>(c); 
    } 
} 

...それはサーキットの周りのコンテキスト保存を適用する...

public class ContextCallable<K> implements Callable<K> { 

    private final Callable<K> callable; 
    private final PreservableContexts contexts; 

    public ContextCallable(Callable<K> actual) { 
     this.callable = actual; 
     this.contexts = new PreservableContexts(); 
    } 

    @Override 
    public K call() throws Exception { 
     contexts.set(); 
     try { 
      return callable.call(); 
     } finally { 
      contexts.clear(); 
     } 
    } 
} 

... Zipkinコンテキストを保存できるヘルパークラスである経て...

public class PreservableContexts { 

    private final TraceContext traceContext; 

    public PreservableContexts() { 
     this.traceContext = TraceContextHolder.getContext(); 
    } 

    public void set() { 
     if (traceContext != null) { 
      TraceContextHolder.setContext(traceContext); 
     } 
    } 

    public void clear() { 
     TraceContextHolder.clearContext(); 
    } 

} 

...あなたが保存したいかもしれない他のコンテキストを追加する簡単な方法を許可するMDC、SecurityContextなど...

+0

カスタムConcurrentStrategyクラスを作成し、ログを追加して呼び出されていることを確認しましたが、ログは出力されません。コメントでうまく書式を設定するのは難しいので、私はその質問を編集しました。何か案は?また、HystrixConcurrenyStrategyのJavadocは、HystrixCommandでの使用であり、HystrixObservableCommandではないことを言います。 '' '例えば、HystrixCommandによって実行されるすべてのCallableはwrapCallable(Callable)を呼び出してカスタム実装がCallableを追加動作で飾るチャンスを与えます。 '' ' カスタムConcurrentStrategyは両方のために機能しますか? – user2237511

+0

[スレッド分離戦略](https://github.com/Netflix/Hystrix/wiki/How-it-Works#threads--thread-pools)を使用している場合、[同時実行戦略](https:// github .com/Netflix/Hystrix/wiki/plugins#concurrencystrategy)を適用する必要があります。セマフォの隔離についてはわかりませんが、とにかくそれを選ぶようには見えません。 これをローカルでテストしただけで、Observableコマンドがプラグインを起動します。 私はあなたが持っている問題は、どのようにコマンドを呼び出すのかと思います。 'teamCommand.construct()'の代わりに 'teamCommand.execute()'または 'teamCommand.queue()'を試してください。 – Mike

+0

[同期実行](https://github.com/Netflix/Hystrix/wiki/How-To-Use#synchronous-execution)と[非同期実行]に関する手順(https://github.com/Netflix/Hystrix/) wiki/How-To-Use#asynchronous-execution)は、[How To Use](https://github.com/Netflix/Hystrix/wiki/How-To-Use)ページにあります。 – Mike

関連する問題