2017-04-13 7 views
1

Boost.Asioのストランドと優先ラッパーを同時に使用したいと思います。Boost Asioにストランドラッパーと優先ラッパーを組み合わせる方法

私は私のコードを書く前に、私は次の情報を読んだ:私は、ラッパーのアプローチを使用したい

Boost asio priority and strand

boost::asio and Active Object

http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531

Why do I need strand per connection when using boost::asio?

をなぜなら、async_readなどのさまざまな非同期APIを使用したいからです。 sync_write、およびasync_connectです。 http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531によると、優先ラッパーとストランドラッパーを組み合わせることができます。

は、だから私は、次の例に基づいてコードを書いた:ここ

http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/example/cpp03/invocation/prioritised_handlers.cpp

は私のコードです:

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     wrapped_handler(handler_priority_queue& q, int p, Handler h) 
      : queue_(q), priority_(p), handler_(std::move(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_PRIORITY 
       ) 
#endif 
#if ENABLE_STRAND 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

ラッパーは、次のマクロで有効になっています

両方のマクロを有効にすると、私は次のような結果を得ましたT:

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
execute(3) 
execute(2) 
execute(1) 
execute(0) 
before run_one() 
before run_one() 
before run_one() 

私は

[called] 1,140512649541376 

として

[called] priority,thread_id 

出力を得たが、私はそれを取得していないことを期待しています。

​​では、function_()が呼び出されますが、wrapped_handler::operator()は呼び出されていないようです。 (機能​​は私のコードでpq.execute_all();から呼び出されます。)function_()が呼び出された後、私はシーケンスをトレースし

void execute() { 
    std::cout << "execute(" << priority_ << ")" << std::endl; 
    function_(); // It is called. 
} 

template <typename Handler> 
class wrapped_handler { 
public: 

    template <typename... Args> 
    void operator()(Args&&... args) { // It is NOT called 
     std::cout << "operator() " << std::endl; 
     handler_(std::forward<Args>(args)...); 
    } 

次の関数が呼び出されます。

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L191 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L76 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/strand.hpp#L158 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.hpp#L55 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L94

が次に機能bool strand_service::do_dispatch(implementation_type& impl, operation* op)で、操作opが呼び出されていないが、キューint型に次の行をプッシュ:

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L111

function_()がstrand_serviceに発送される理由がわかりません。私は、ストランドのラッパーがすでに私のコードでは、次の点でunwrapedされていることを考える:

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

私は唯一の優先ラッパーを有効にした場合、私は以下の結果を得ました。 期待通りに働いているようです。

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
operator() 
[called] 4,140512649541376 
execute(3) 
operator() 
[called] 3,140512649541376 
execute(2) 
operator() 
[called] 2,140512649541376 
execute(1) 
operator() 
[called] 1,140512649541376 
execute(0) 
operator() 
[called] 0,140512649541376 
before run_one() 
before run_one() 
before run_one() 

ストランドラッパーのみを有効にした場合、次の結果が得られます。 私は期待通りに働いているようです。

before run_one() 
[called] 0,140127385941760 
before poll_one() 
[called] 1,140127385941760 
[called] 2,140127385941760 
[called] 3,140127385941760 
[called] 4,140127385941760 
before execute_all() 
before run_one() 
before run_one() 
before run_one() 

答えて

1

私はこの問題を解決しました。

なぜfunction_()がstrand_serviceにディスパッチされるのかわかりません。パラメータfが元のハンドラである

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

:私は、ストランドのラッパーがすでに私のコードでは、次の点でunwrapedされていることと思います。これは、優先度キューラップとストランドラップハンドラを意味します。ストランドラッパーは外側です。したがって、fを呼び出すとき、それはstrand_serviceにディスパッチされます。このプロセスは同じstrand_serviceで行われるため、ハンドラは呼び出されません。次のように代わりにfの優先順位キューにh->handler_を追加し、この問題を解決するための

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

handler_はクラステンプレートwrapped_handlerのメンバ変数です。ラップされていないハンドラを保持します。ここで

は完全なコードです:

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     template <typename HandlerArg> 
     wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h) 
      : queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_STRAND 
       ) 
#endif 
#if ENABLE_PRIORITY 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

そして、ここでは、出力されます。

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
[called] 4,139903315736320 
execute(3) 
[called] 3,139903315736320 
execute(2) 
[called] 2,139903315736320 
execute(1) 
[called] 1,139903315736320 
execute(0) 
[called] 0,139903315736320 
before run_one() 
before run_one() 
before run_one() 
関連する問題