2012-08-28 10 views
6

boost::asioとというオブジェクトを使用してスレッドプールを実装しました。boost::asio::io_service::run()を呼び出しています。しかし、私が与えられた要件は、すべてのスレッドを監視して "正常性"を確認する方法です。私の目的は、スレッドプールを通過できる単純なセンチネルオブジェクトを作成することです。スレッドが完了すると、そのスレッドはまだ処理中であるとみなすことができます。boost :: asio、スレッドプールとスレッドの監視

しかし、私の実装を考えると、私は(あれば)、私は確実に、プール内のすべてのスレッドを監視することができますかどうかはわかりません。私は単純にそう実際にそのセンチネルを取得し、作業を行いますどのスレッド保証するものではありませんio_serviceインスタンスにセンチネルオブジェクトを掲示、boost::asio::io_service::run()にスレッド機能を委任しました。

1つのオプションは、単に定期的にセンチネルを挿入し、それが少なくとも一度は時間のいくつかの合理的な量で、各スレッドによってピックアップされることを願っていますが、それは明らかに理想的ではありませんすることであってもよいです。

は、次の例を見てみましょう。原因ハンドラがコード化される方法に、この例では、我々は、各スレッドが同じ作業量を尽くすことがわかりますが、実際には、私はハンドラ実装のコントロールを持っていない、他の人がほとんどだろうしながら、いくつかは、長時間実行することができます即時。

#include <iostream> 
#include <boost/asio.hpp> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 

void handler() 
{ 
    std::cout << boost::this_thread::get_id() << "\n"; 
    boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 
} 

int main(int argc, char **argv) 
{ 
    boost::asio::io_service svc(3); 

    std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc)); 

    boost::thread one(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread two(boost::bind(&boost::asio::io_service::run, &svc)); 
    boost::thread three(boost::bind(&boost::asio::io_service::run, &svc)); 

    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 
    svc.post(handler); 

    work.reset(); 

    three.join(); 
    two.join(); 
    one.join(); 

    return 0; 
} 

答えて

2

:あなたが行うことができますスレッドプールでタスクを投稿する

。統計を更新するラッパータイプを作成し、スレッドプールに投稿されたユーザー定義のハンドラーをコピーします。このラッパータイプのみが基礎となるio_serviceに転記されます。このメソッドを使用すると、ユーザーコードに侵入することなく、投稿/実行されるハンドラーを追跡できます。

は、ここでストリップダウンし、簡単な例です:

#include <iostream> 
#include <memory> 
#include <vector> 
#include <boost/thread.hpp> 
#include <boost/asio.hpp> 

// Supports scheduling anonymous jobs that are 
// executable as returning nothing and taking 
// no arguments 
typedef std::function<void(void)> functor_type; 

// some way to store per-thread statistics 
typedef std::map<boost::thread::id, int> thread_jobcount_map; 

// only this type is actually posted to 
// the asio proactor, this delegates to 
// the user functor in operator() 
struct handler_wrapper 
{ 
    handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics) 
     : user_functor_(user_functor) 
     , statistics_(statistics) 
    { 
    } 

    void operator()() 
    { 
     user_functor_(); 

     // just for illustration purposes, assume a long running job 
     boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 

     // increment executed jobs 
     ++statistics_[boost::this_thread::get_id()]; 
    } 

    functor_type   user_functor_; 
    thread_jobcount_map& statistics_; 
}; 

// anonymous thread function, just runs the proactor 
void thread_func(boost::asio::io_service& proactor) 
{ 
    proactor.run(); 
} 

class ThreadPool 
{ 
public: 
    ThreadPool(size_t thread_count) 
    { 
     threads_.reserve(thread_count); 

     work_.reset(new boost::asio::io_service::work(proactor_)); 

     for(size_t curr = 0; curr < thread_count; ++curr) 
     { 
     boost::thread th(thread_func, boost::ref(proactor_)); 

     // inserting into this map before any work can be scheduled 
     // on it, means that we don't have to look it for lookups 
     // since we don't dynamically add threads 
     thread_jobcount_.insert(std::make_pair(th.get_id(), 0)); 

     threads_.emplace_back(std::move(th)); 
     } 
    } 

    // the only way for a user to get work into 
    // the pool is to use this function, which ensures 
    // that the handler_wrapper type is used 
    void schedule(const functor_type& user_functor) 
    { 
     handler_wrapper to_execute(user_functor, thread_jobcount_); 
     proactor_.post(to_execute); 
    } 

    void join() 
    { 
     // join all threads in pool: 
     work_.reset(); 
     proactor_.stop(); 

     std::for_each(
     threads_.begin(), 
     threads_.end(), 
     [] (boost::thread& t) 
     { 
     t.join(); 
     }); 
    } 

    // just an example showing statistics 
    void log() 
    { 
     std::for_each(
     thread_jobcount_.begin(), 
     thread_jobcount_.end(), 
     [] (const thread_jobcount_map::value_type& it) 
     { 
     std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n"; 
     }); 
    } 

private: 
    std::vector<boost::thread> threads_; 
    std::unique_ptr<boost::asio::io_service::work> work_; 
    boost::asio::io_service proactor_; 
    thread_jobcount_map  thread_jobcount_; 
}; 

struct add 
{ 
    add(int lhs, int rhs, int* result) 
     : lhs_(lhs) 
     , rhs_(rhs) 
     , result_(result) 
    { 
    } 

    void operator()() 
    { 
     *result_ = lhs_ + rhs_; 
    } 

    int lhs_,rhs_; 
    int* result_; 
}; 

int main(int argc, char **argv) 
{ 
    // some "state objects" that are 
    // manipulated by the user functors 
    int x = 0, y = 0, z = 0; 

    // pool of three threads 
    ThreadPool pool(3); 

    // schedule some handlers to do some work 
    pool.schedule(add(5, 4, &x)); 
    pool.schedule(add(2, 2, &y)); 
    pool.schedule(add(7, 8, &z)); 

    // give all the handlers time to execute 
    boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); 

    std::cout 
     << "x = " << x << "\n" 
     << "y = " << y << "\n" 
     << "z = " << z << "\n"; 

    pool.join(); 

    pool.log(); 
} 

出力:

x = 9 
y = 4 
z = 15 
Thread: 0000000000B25430 executed 1 jobs 
Thread: 0000000000B274F0 executed 1 jobs 
Thread: 0000000000B27990 executed 1 jobs 
+0

@Chadの回答にコードを追加できますか? –

+0

完了。それに対するフィードバックがあれば幸いです。 – Chad

6

あなたはすべてのスレッドとスレッドごとのプライベートio_serviceインスタンス間で共通io_serviceインスタンスを使用することができます。すべてのスレッドは、次のようにメソッドを実行します:

void Mythread::threadLoop() 
{ 
    while(/* termination condition */) 
    { 
     commonIoService.run_one(); 
     privateIoService.run_one(); 

     commonConditionVariable.timed_wait(time); 
    } 
} 

このところで、あなたはいくつかのタスクをスレッドで実行されていることを確認したい場合は、あなただけのその所有io_serviceでこのタスクを投稿しなければなりません。私が使用した溶液は、私はトレッドプールオブジェクトの実装を所有しているという事実に依存している

void MyThreadPool::post(Hander handler) 
{ 
    commonIoService.post(handler); 
    commonConditionVariable.notify_all(); 
} 
+0

興味深いアプローチが、私はもう少し単純な何かを探しています。何日か後に何も現れなければ、私はこの答えを受け入れるかもしれない。 – Chad

+0

私はboot asioを使用すると簡単な解決策はないと思います。私はこのような解決策をいくつかのコードで開発しています。 –