2012-07-26 15 views
9

私はboost::asio::io_serviceを基本スレッドプールとして使用しています。いくつかのスレッドがio_serviceに追加され、メインスレッドがハンドラのポスティングを開始し、ワーカースレッドがハンドラの実行を開始し、すべてが終了します。ここまでは順調ですね;私はシングルスレッドのコードよりもはるかに高速です。Boost Asioでのポストキューサイズの設定制限?

しかし、メインスレッドには何百万ものものがあります。そして、それを投稿するだけで、ワーカースレッドが処理できるよりもはるかに高速です。私はRAM制限を打つことはありませんが、それでもたくさんのものをエンキューするのは馬鹿です。私がしたいのは、ハンドラ・キューのための固定サイズを持ち、キューが満杯の場合にpost()ブロックを持つことです。

Boost ASIOのドキュメントでは、このオプションは表示されません。これは可能ですか?

答えて

0

ストランドオブジェクトを使用してイベントを配置し、メインに遅延を入れることができますか?すべての作品が投稿された後、あなたのプログラムは脱落していますか?もしそうなら、io_serviceが停止したときをより詳細に制御できる作業オブジェクトを使用することができます。

スレッドの状態を常にメインにチェックして、スレッドが空いているかどうかを待つことができます。

//リンク

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service__strand.html

http://www.boost.org/doc/libs/1_40_0/doc/html/boost_asio/reference/io_service.html

//example from the second link 
boost::asio::io_service io_service; 
boost::asio::io_service::work work(io_service); 

は、このことができます願っています。

+0

問題は、作業を完了する前に 'io_service'が停止しているということではなく、' io_service'を正常に停止させるために 'work'オブジェクトを削除することについて知っています。問題は、 'io_service'があまりにも多くのタスクを蓄積できることです。割り当てられていないタスクの数を、タスクを作成するスレッドの部分にポーリングすることを含まないように制限したいので、 'poll()'をブロックすることができるかどうかという疑問があります。 – uckelman

2

ハンドラのキューサイズを修正するためにセマフォを使用しています。次のコードは、このソリューションを示しています

void Schedule(boost::function<void()> function) 
{ 
    semaphore.wait(); 
    io_service.post(boost::bind(&TaskWrapper, function)); 
} 

void TaskWrapper(boost::function<void()> &function) 
{ 
    function(); 
    semaphore.post(); 
} 
1

あなたが「進行中」の作業を数えるの世話をするだろう、別のラムダであなたのラムダをラップして、あまりにも多くの進行中のタスクがある場合は投稿する前に待つことができます。

例:

#include <atomic> 
#include <chrono> 
#include <future> 
#include <iostream> 
#include <mutex> 
#include <thread> 
#include <vector> 
#include <boost/asio.hpp> 

class ThreadPool { 
    using asio_worker = std::unique_ptr<boost::asio::io_service::work>; 
    boost::asio::io_service service; 
    asio_worker service_worker; 
    std::vector<std::thread> grp; 
    std::atomic<int> inProgress = 0; 
    std::mutex mtx; 
    std::condition_variable busy; 
public: 
    ThreadPool(int threads) : service(), service_worker(new asio_worker::element_type(service)) { 
    for (int i = 0; i < threads; ++i) { 
     grp.emplace_back([this] { service.run(); }); 
    } 
    } 

    template<typename F> 
    void enqueue(F && f) { 
    std::unique_lock<std::mutex> lock(mtx); 
    // limit queue depth = number of threads 
    while (inProgress >= grp.size()) { 
     busy.wait(lock); 
    } 
    inProgress++; 
    service.post([this, f = std::forward<F>(f)]{ 
     try { 
     f(); 
     } 
     catch (...) { 
     inProgress--; 
     busy.notify_one(); 
     throw; 
     } 
     inProgress--; 
     busy.notify_one(); 
    }); 
    } 

    ~ThreadPool() { 
    service_worker.reset(); 
    for (auto& t : grp) 
     if (t.joinable()) 
     t.join(); 
    service.stop(); 
    } 
}; 

int main() { 
    std::unique_ptr<ThreadPool> pool(new ThreadPool(4)); 
    for (int i = 1; i <= 20; ++i) { 
    pool->enqueue([i] { 
     std::string s("Hello from task "); 
     s += std::to_string(i) + "\n"; 
     std::cout << s; 
     std::this_thread::sleep_for(std::chrono::seconds(1)); 
    }); 
    } 
    std::cout << "All tasks queued.\n"; 
    pool.reset(); // wait for all tasks to complete 
    std::cout << "Done.\n"; 
} 

出力:

Hello from task 3 
Hello from task 4 
Hello from task 2 
Hello from task 1 
Hello from task 5 
Hello from task 7 
Hello from task 6 
Hello from task 8 
Hello from task 9 
Hello from task 10 
Hello from task 11 
Hello from task 12 
Hello from task 13 
Hello from task 14 
Hello from task 15 
Hello from task 16 
Hello from task 17 
Hello from task 18 
All tasks queued. 
Hello from task 19 
Hello from task 20 
Done. 
0

たぶん、ワーカースレッドがビジー状態得れば、彼らはメインスレッドとシステムの自己制限を飢えさせるように、メインスレッドの優先順位を下げてみてください。

関連する問題