2017-11-11 7 views
0

私はワーカープールを持つCalculationSupervisorアクターを持っています。
計算を行う度に、CalculationSupervisorはルータを使用してCalculationRequestをワーカーにブロードキャストします。Akkaブロードキャスト:最初の返信を取得して他の人を破棄します

最も速い計算結果を得て、他の結果を無視する必要があります。次のように

CalculationSupervisorに見えます:

public class CalculationSupervisor extends AbstractActor { 

    private Router router = new Router(new RoundRobinRoutingLogic()); 

    public static Props props() { 
     return Props.create(CalculationSupervisor.class, CalculationSupervisor::new); 
    } 

    @Override 
    public Receive createReceive() { 
     return receiveBuilder() 
       .match(RegisterWorker.class, registration -> { 
        final String workerName = registration.name(); 
        final ActorRef worker = 
         context().actorOf(Worker.props(workerName), workerName); 
        router = router.addRoutee(worker); 
       }) 
       .match(CalculationRequest.class, (request) -> { 
        router.route(new Broadcast(request), self()); 
       }) 
       .match(CalculationResult.class, (result) -> { 
        // process only the first (the fastest) result 
       }) 
       .build(); 
    } 
} 

(最速)最初の後に来るのメッセージを廃棄するロジックを実装するための最良のパターンが入って来何を引き起こすのですか?

答えて

0

スーパーバイザが複数のリクエストを処理している場合は、簡単な方法で、リクエストIDのペアとリクエストに対して受け取ったレスポンスの数を保持することをお勧めします(Map)。スーパーバイザはこのマップを検査し、処理時にその特定の要求IDの応答数がゼロである場合にのみ応答を処理します。

  1. public class CalculationSupervisor extends AbstractActor { 
        ... 
        private int poolSize = 0; 
        private Map<Long, Integer> numReplies = new HashMap<>(); 
    
        @Override 
        public Receive createReceive() { 
         return receiveBuilder() 
          .match(RegisterWorker.class, registration -> { 
           ... 
           poolSize = poolSize + 1; 
          }) 
          .match(CalculationRequest.class, request -> { 
           numReplies.putIfAbsent(request.getId(), 0); 
           router.route(new Broadcast(request), self()); 
          }) 
          .match(CalculationResult.class, result -> { 
           Long requestId = result.getRequestId(); 
           if (numReplies.contains(requestId)) { 
            int num = numReplies.get(requestId); 
            if (num == 0) { 
             // process only the first (the fastest) result 
             ... 
             numReplies.put(requestId, 1); 
            } else { 
             if (num + 1 == poolSize) 
              numReplies.remove(requestId); 
             else 
              numReplies.put(requestId, num + 1); 
            } 
           } 
          }) 
          .build(); 
        } 
    } 
    

    上記のアプローチに2つの仮定がある:結果の数がプールのサイズに等しい場合、さらに、無限に成長するマップを防止するために、スーパーバイザはマップからエントリを削除します要求IDは、CalculationRequestCalculationResultクラスの両方で使用できます(この例では、IDはLongです)。適切なものを使用してください。

  2. ルートは、要求が送信される前にスーパーバイザに登録されます。

さらに簡単な解決策は、ルータを使用しないことです。この場合、CalculationSupervisorは同じ要求に対して複数の結果を調整する必要はありません。最初の結果を除いてすべてのリクエストを廃棄しているので、最初はルータを使用することは意味がありません。

+0

CalculationSupervisorには多くのリクエストがあり、それらのすべてを処理したいと考えています。あなたのアプローチでは、CalculationRequestのいずれかのCalculationResultを取得した場合、他のリクエストの処理結果は停止します。あなたのソリューションは、いくつかのマップ計算要求識別子とそれらのステータスを使用して拡張することができます。これは私の元々の考えですが、私はよりエレガントなソリューションを探しています:) –

関連する問題