2017-01-04 11 views
0

私はProject Reactorに関する3つの質問があります。私が持っているコードから始めてください(問題を分かりやすくするために簡略化されています)。 プロジェクトの原子炉のタイムアウト処理

Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) { 
    return Mono.just("hello") 
     .compose(monostr -> monostr 
      .doOnSuccess(str -> System.out.println("Suppose I want to release session here after all")) //(1) 
      .doOnCancel(() -> System.out.println("cancelled")) //(2) 
      .then(callback::apply) 
      .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout))) 
     ); 
} 

とテスト:だから

@Test 
public void testDoWithSession2() throws Exception { 
    Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> { 
    System.out.println("do some long timed work"); 
    try { 
     Thread.sleep(5000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    System.out.println("work has completed"); 
    return str.length(); 
    }); 

    StepVerifier.create(doWithSession(fun1,1000)) 
    .verifyError(TimeoutException.class); 
} 

と質問:

  1. fun1の呼び出しを中断し、すぐにエラーを返すためにどのように? (多分私は何か間違っていますが、タイムアウト後ではなくコールバックの呼び出し後にエラーが返されます)
  2. なぜdoOnSuccessdoOnCancelが同時に呼び出されますか? (私が期待される(1)または(2)が呼び出されますが、両方ではない)
  3. そして、どのように次のような場合に対処するために:
    • コードでMono.just("hello")が接続を取得していることを想像。
    • callback)私は接続して何らかの結果を得ています(私の場合はMono<Integer>)。
    • (成功時または失敗時)セッションを解放したい((1)でこれを実行しようとしています)。答えはスケジューラを使用することであるように第一の質問には

答えて

1

1)あなたが見つけたとおり、.publishOn(Schedulers.single())を使用してください。これにより、呼び出し可能オブジェクトが別のスレッドで呼び出され、そのスレッドのみがブロックされます。加えて、コール可能を取り消すことができます。

2)チェーンの順序が重要です。 composeの先頭に.doOnSuccessを入れます(後で再利用するためにその作成関数を抽出しない限り、その特定の例は本当に必要ありません)。つまり、基本的にはMono.justからの通知を受け取り、処理が行われる前でもソースが照会されるとすぐに実行されます。同じdoOnCancelと同じです。キャンセルはtimeoutトリガーから来ます。

3)リソースからシーケンスを作成し、リソースがクリーンアップされるように工場があります:Mono.using

public <T> Mono<T> doWithConnection(Function<String, Mono<T>> callback, long timeout) { 
    return Mono.using(
      //the resource supplier: 
      () -> { 
       System.out.println("connection acquired"); 
       return "hello"; 
      }, 
      //create a Mono out of the resource. On any termination, the resource is cleaned up 
      connection -> Mono.just(connection) 
           //the blocking callable needs own thread: 
           .publishOn(Schedulers.single()) 
           //execute the callable and get result... 
           .then(callback::apply) 
           //...but cancel if it takes too long 
           .timeoutMillis(timeout) 
           //for demonstration we'll log when timeout triggers: 
           .doOnError(TimeoutException.class, e -> System.out.println("timed out")), 
      //the resource cleanup: 
      connection -> System.out.println("cleaned up " + connection)); 
} 

呼び出し可能なのT値のMono<T>を返します。だから、そのようになります。プロダクションコードでは、値を処理するためにそれを購読します。テストでは、StepVerifier.create()があなたに加入します。

connection acquired 
start some long timed work 
1s... 
2s... 
timed out 
cleaned up hello 

そして、我々は5000の上にタイムアウトを置けば、私たちは次のことを得る:

@Test 
public void testDoWithSession2() throws Exception { 
    Function<String, Mono<Integer>> fun1 = str -> Mono.fromCallable(() -> { 
     System.out.println("start some long timed work"); 
     //for demonstration we'll print some clock ticks 
     for (int i = 1; i <= 5; i++) { 
      try { 
       Thread.sleep(1000); 
       System.out.println(i + "s..."); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     System.out.println("work has completed"); 
     return str.length(); 
    }); 

    //let two ticks show up 
    StepVerifier.create(doWithConnection(fun1,2100)) 
       .verifyError(TimeoutException.class); 
} 

をこの出力:

はのは、あなたの実行時間の長いタスクと、それは出力内容を確認することを実証してみましょう。 (ステップベリファイアがタイムアウトを期待するため、アサーションエラーが発生します)。

connection acquired 
start some long timed work 
1s... 
2s... 
3s... 
4s... 
5s... 
work has completed 
cleaned up hello 

java.lang.AssertionError: expectation "expectError(Class)" failed (expected: onError(TimeoutException); actual: onNext(5) 
0

に見えます:

Mono<Integer> doWithSession(Function<String, Mono<Integer>> callback, long timeout) { 
    Scheduler single = Schedulers.single(); 
    return Mono.just("hello") 
      .compose(monostr -> monostr 
        .publishOn(single) // use scheduler 
        .then(callback::apply) 
        .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout))) 
      ); 
} 

第三の問題は、この方法を解決することができます。

private Mono<Integer> doWithSession3(Function<String, Mono<Integer>> callback, long timeout) { 
    Scheduler single = Schedulers.single(); 
    return Mono.just("hello") 
      .then(str -> Mono.just(str) // here wrapping our string to new Mono 
        .publishOn(single) 
        .then(callback::apply) 
        .timeoutMillis(timeout, Mono.error(new TimeoutException("Timeout after " + timeout))) 
        .doAfterTerminate((res, throwable) -> System.out.println("Do anything with your string" + str)) 
      ); 
} 
関連する問題