2013-03-06 17 views
14

私は、ポインタをargとし、mainをとる関数を持つプログラムを持っています。主にnスレッドを作成しており、それぞれが渡されたargに応じて異なるメモリ領域で機能を実行しています。スレッドが結合され、メインは領域間のデータの混合を実行し、古いスレッドと同じ操作を行うn新しいスレッドを作成します。同期でスレッドの寿命を延ばす(C++ 11)

プログラムを改善するために、スレッドを作成しておくのに必要な時間を省き、スレッドを有効にしたいと考えています。スレッドは、メインが動作しているときにスリープして、再び起動する必要があるときに通知する必要があります。同じように、メインはスレッドがジョインしたときと同じように動作するのを待つ必要があります。

私はこれを強く実装することはできません。常にデッドロックになります。

シンプルなベースラインコード、これを変更する方法についてのヒントがずっとあなたが望むコンセプトは、スレッドプールで

#include <thread> 
#include <climits> 

... 

void myfunc(void * p) { 
    do_something(p); 
} 

int main(){ 
    void * myp[n_threads] {a_location, another_location,...}; 
    std::thread mythread[n_threads]; 
    for (unsigned long int j=0; j < ULONG_MAX; j++) { 
    for (unsigned int i=0; i < n_threads; i++) { 
     mythread[i] = std::thread(myfunc, myp[i]); 
    } 
    for (unsigned int i=0; i < n_threads; i++) { 
     mythread[i].join(); 
    } 
    mix_data(myp); 
    } 
    return 0; 
} 
+0

正確にデッドロックですか。それは 'mythread [i] .join();にありますか? –

+0

いいえ、これはデッドロックできないコードの例です。 デッドロックしないより優れた実装(結合しているスレッドを削除しない)が見つかりません。 – DarioP

+0

あなたはおそらく[スレッドプール](http://stackoverflow.com/questions/3988128/c-thread-pool)を探していますか? – us2012

答えて

10

をいただければ幸いです。このSO questionは、既存の実装を扱います。

考えられるのは、いくつかのスレッドインスタンス用のコンテナを用意することです。各インスタンスはタスクキューをポーリングする関数に関連付けられ、タスクが使用可能になると、それをプルして実行します。タスクが終了すると(終了しても別の問題です)、スレッドは単にタスクキューにループオーバーします。

同期されたキュー、キュー上のループを実装するスレッドクラス、タスクオブジェクト用のインターフェイス、そしてすべてを駆動するクラス(プールクラス)が必要です。

また、実行する必要があるタスク(非常に特殊なメモリ領域をパラメータとして持つ)に対して非常に特殊なスレッドクラスを作成することもできます。これには、スレッドが現在の繰り返しで完了したことを示すための通知メカニズムが必要です。

スレッドのメイン関数は、その特定のタスクのループであり、1回の反復の最後に、スレッドはその終了を通知し、次のループを開始するための条件変数を待ちます。本質的には、スレッド内のタスクコードをインライン展開し、キューの必要性を完全になくします。

using namespace std; 

// semaphore class based on C++11 features 
class semaphore { 
    private: 
     mutex mMutex; 
     condition_variable v; 
     int mV; 
    public: 
     semaphore(int v): mV(v){} 
     void signal(int count=1){ 
      unique_lock lock(mMutex); 
      mV+=count; 
      if (mV > 0) mCond.notify_all(); 
     } 
     void wait(int count = 1){ 
      unique_lock lock(mMutex); 
      mV-= count; 
      while (mV < 0) 
       mCond.wait(lock); 
     } 
}; 

template <typename Task> 
class TaskThread { 
    thread mThread; 
    Task *mTask; 
    semaphore *mSemStarting, *mSemFinished; 
    volatile bool mRunning; 
    public: 
    TaskThread(Task *task, semaphore *start, semaphore *finish): 
     mTask(task), mRunning(true), 
     mSemStart(start), mSemFinished(finish), 
     mThread(&TaskThread<Task>::psrun){} 
    ~TaskThread(){ mThread.join(); } 

    void run(){ 
     do { 
      (*mTask)(); 
      mSemFinished->signal(); 
      mSemStart->wait(); 
     } while (mRunning); 
    } 

    void finish() { // end the thread after the current loop 
     mRunning = false; 
    } 
private: 
    static void psrun(TaskThread<Task> *self){ self->run();} 
}; 

classcMyTask { 
    public: 
    MyTask(){} 
    void operator()(){ 
     // some code here 
    } 
}; 

int main(){ 
    MyTask task1; 
    MyTask task2; 
    semaphore start(2), finished(0); 
    TaskThread<MyTask> t1(&task1, &start, &finished); 
    TaskThread<MyTask> t2(&task2, &start, &finished); 
    for (int i = 0; i < 10; i++){ 
     finished.wait(2); 
     start.signal(2); 
    } 
    t1.finish(); 
    t2.finish(); 
} 

提案(粗製)の実装は、上記operator()(クラスなどすなわちファンクタ)を提供しなければならないTaskタイプに依存しています。私はあなたが以前にスレッド関数本体に直接タスクコードを組み込むことができると言ったが、私はそれを知らないので、私はできるだけ抽象的に保った。スレッドの開始には1つの条件変数があり、終了のための1つの条件変数があり、どちらもセマフォインスタンスにカプセル化されています。 boost::barrierの使用を提案し、他の答えを見て

、私はこの考えを支持することができます可能であれば、そのクラスで私のセマフォクラスを交換してください、十分にテスト頼りにする方が良いということであることの理由と同じ機能セットに対して自己実装されたソリューションではなく、外部コードを維持していました。

いずれのアプローチも有効ですが、前者は柔軟性のためにわずかなパフォーマンスを放棄します。実行されるタスクが十分に長い時間を要する場合、管理および待ち行列同期コストは無視できる程度になります。

更新:コードが修正され、テストされました。単純条件変数をセマフォに置き換えました。

+0

これは私がコメントで提案したものですが、明らかにOPは彼のニーズに合っているとは考えていません。スレッドプールが過度に使用される可能性があるという彼の懸念に対処できますか? – us2012

+0

私は多かれ少なかれと思います。私は自分の携帯電話を使用しているのでサンプル実装を提供していません。また、C++ std :: threadsとcondition varsの十分な知識がありません。自信を持って書きます。編集は歓迎です。 – didierc

+0

これはプールを探す2番目の提案です。私はこれについて考えますが、それでもmutexと条件変数に基づく解決策が必要です。私はこの観点からも(また)問題にアプローチしたいと思っています。 – DarioP

5

バリア(条件変数とカウンタ上の便利なラッパー)を使用すると簡単に達成できます。基本的に、N個のスレッドがすべて「バリア」に達するまでブロックします。その後、再び「リサイクル」します。 Boostは実装を提供します。

void myfunc(void * p, boost::barrier& start_barrier, boost::barrier& end_barrier) { 
    while (!stop_condition) // You'll need to tell them to stop somehow 
    { 
     start_barrier.wait(); 
     do_something(p); 
     end_barrier.wait(); 
    } 
} 

int main(){ 
    void * myp[n_threads] {a_location, another_location,...}; 

    boost::barrier start_barrier (n_threads + 1); // child threads + main thread 
    boost::barrier end_barrier (n_threads + 1); // child threads + main thread 

    std::thread mythread[n_threads]; 

    for (unsigned int i=0; i < n_threads; i++) { 
     mythread[i] = std::thread(myfunc, myp[i], start_barrier, end_barrier); 
    } 

    start_barrier.wait(); // first unblock the threads 

    for (unsigned long int j=0; j < ULONG_MAX; j++) { 
    end_barrier.wait(); // mix_data must not execute before the threads are done 
    mix_data(myp); 
    start_barrier.wait(); // threads must not start new iteration before mix_data is done 
    } 
    return 0; 
} 
+0

+1(そのことを忘れてしまった)。 – didierc

+0

これは原則として、さまざまなマシンでコンパイルして実行し、クラスタ上で簡単に実行できる科学ソフトウェア用です。移植性のために、私は標準ライブラリまたは少なくともPOSIXを使用したいので、増強を避けてください。しかし、このソリューションは本当に、本当に素敵で簡単です!私はこれを真剣に検討します、ありがとう! – DarioP

+0

@DarioP、バリアは文字通り十数行のコードを実装するのは簡単ではありません。ここから貼り付けることができます。http://www.boost.org/doc/libs/1_53_0/boost/thread/barrier.hpp 'boost :: mutex'と' conditional_variable'をposixに相当するものに置き換えてください。 –

15

ここでは、C++ 11標準ライブラリのクラスのみを使用したアプローチが考えられます。基本的に、作成した各スレッドには、連続してチェックされる関連コマンドキュー(std::packaged_task<>オブジェクトにカプセル化されています)があります。キューが空の場合、スレッドは条件変数(std::condition_variable)を待機します。

データ競合がstd::mutexstd::unique_lock<> RAIIラッパーを使用して回避している間、メインスレッドは、各std::packaged_tast<>を提出し、その上にwait()を呼び出すに関連するstd::future<>オブジェクトを格納することによって終了する特定のジョブを待つことができます。

以下は、このデザインに従った簡単なプログラムです。コメントは、それが何をしているかを説明するのに十分でなければなりません:

#include <thread> 
#include <iostream> 
#include <sstream> 
#include <future> 
#include <queue> 
#include <condition_variable> 
#include <mutex> 

// Convenience type definition 
using job = std::packaged_task<void()>; 

// Some data associated to each thread. 
struct thread_data 
{ 
    int id; // Could use thread::id, but this is filled before the thread is started 
    std::thread t; // The thread object 
    std::queue<job> jobs; // The job queue 
    std::condition_variable cv; // The condition variable to wait for threads 
    std::mutex m; // Mutex used for avoiding data races 
    bool stop = false; // When set, this flag tells the thread that it should exit 
}; 

// The thread function executed by each thread 
void thread_func(thread_data* pData) 
{ 
    std::unique_lock<std::mutex> l(pData->m, std::defer_lock); 
    while (true) 
    { 
     l.lock(); 

     // Wait until the queue won't be empty or stop is signaled 
     pData->cv.wait(l, [pData]() { 
      return (pData->stop || !pData->jobs.empty()); 
      }); 

     // Stop was signaled, let's exit the thread 
     if (pData->stop) { return; } 

     // Pop one task from the queue... 
     job j = std::move(pData->jobs.front()); 
     pData->jobs.pop(); 

     l.unlock(); 

     // Execute the task! 
     j(); 
    } 
} 

// Function that creates a simple task 
job create_task(int id, int jobNumber) 
{ 
    job j([id, jobNumber]() 
    { 
     std::stringstream s; 
     s << "Hello " << id << "." << jobNumber << std::endl; 
     std::cout << s.str(); 
    }); 

    return j; 
} 

int main() 
{ 
    const int numThreads = 4; 
    const int numJobsPerThread = 10; 
    std::vector<std::future<void>> futures; 

    // Create all the threads (will be waiting for jobs) 
    thread_data threads[numThreads]; 
    int tdi = 0; 
    for (auto& td : threads) 
    { 
     td.id = tdi++; 
     td.t = std::thread(thread_func, &td); 
    } 

    //================================================= 
    // Start assigning jobs to each thread... 

    for (auto& td : threads) 
    { 
     for (int i = 0; i < numJobsPerThread; i++) 
     { 
      job j = create_task(td.id, i); 
      futures.push_back(j.get_future()); 

      std::unique_lock<std::mutex> l(td.m); 
      td.jobs.push(std::move(j)); 
     } 

     // Notify the thread that there is work do to... 
     td.cv.notify_one(); 
    } 

    // Wait for all the tasks to be completed... 
    for (auto& f : futures) { f.wait(); } 
    futures.clear(); 


    //================================================= 
    // Here the main thread does something... 

    std::cin.get(); 

    // ...done! 
    //================================================= 


    //================================================= 
    // Posts some new tasks... 

    for (auto& td : threads) 
    { 
     for (int i = 0; i < numJobsPerThread; i++) 
     { 
      job j = create_task(td.id, i); 
      futures.push_back(j.get_future()); 

      std::unique_lock<std::mutex> l(td.m); 
      td.jobs.push(std::move(j)); 
     } 

     // Notify the thread that there is work do to... 
     td.cv.notify_one(); 
    } 

    // Wait for all the tasks to be completed... 
    for (auto& f : futures) { f.wait(); } 
    futures.clear(); 

    // Send stop signal to all threads and join them... 
    for (auto& td : threads) 
    { 
     std::unique_lock<std::mutex> l(td.m); 
     td.stop = true; 
     td.cv.notify_one(); 
    } 

    // Join all the threads 
    for (auto& td : threads) { td.t.join(); } 
} 
+0

はい!ありがとうございます。このソリューションは障壁ほどシンプルではありませんが、理解し、実装し、管理し続けるのは難しくありません。投稿時にこのようなことがあったので、これが本当に必要なものかどうかを知る必要があります。 – DarioP

+1

@DarioP:うれしかった!確かに、これは最も簡単な解決策ではないかもしれませんが、設計上、シンプルさと柔軟性の間にはかなりの妥協点があると思います。例えば、上のプログラムはそれぞれ独自のジョブキューを持つ実行中のスレッドのセットを設定するので、そこからスレッドプールクラスを構築するのは非常に簡単です。また、他のクラスのC++ 11並行性ライブラリ(例えば、 'std :: promise')を使うことで、さらなる通信プロトコルを確立することなくジョブの進捗状況をより細かく制御することができます(例えば、仕事は利用可能です)。 –

0

以下は、いくつかのランダムなものを実行する単純なコンパイルと作業コードです。それはalegunaの障壁の概念を実装しています。各スレッドのタスク長が異なるため、強力な同期メカニズムが必要です。私は同じタスクでプールを行い、結果をベンチマークしようとします。そして、Andy Prowlが指摘している先物を使って多分、

#include <iostream> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 
#include <chrono> 
#include <complex> 
#include <random> 

const unsigned int n_threads=4; //varying this will not (almost) change the total amount of work 
const unsigned int task_length=30000/n_threads; 
const float task_length_variation=task_length/n_threads; 
unsigned int rep=1000; //repetitions of tasks 

class t_chronometer{ 
private: 
    std::chrono::steady_clock::time_point _t; 

public: 
    t_chronometer(): _t(std::chrono::steady_clock::now()) {;} 
    void reset() {_t = std::chrono::steady_clock::now();} 
    double get_now() {return std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() - _t).count();} 
    double get_now_ms() {return 
     std::chrono::duration_cast<std::chrono::duration<double,std::milli>>(std::chrono::steady_clock::now() - _t).count();} 
}; 

class t_barrier { 
private: 
    std::mutex m_mutex; 
    std::condition_variable m_cond; 
    unsigned int m_threshold; 
    unsigned int m_count; 
    unsigned int m_generation; 

public: 
    t_barrier(unsigned int count): 
    m_threshold(count), 
    m_count(count), 
    m_generation(0) { 
    } 

    bool wait() { 
     std::unique_lock<std::mutex> lock(m_mutex); 
     unsigned int gen = m_generation; 

     if (--m_count == 0) 
     { 
      m_generation++; 
      m_count = m_threshold; 
      m_cond.notify_all(); 
      return true; 
     } 

     while (gen == m_generation) 
      m_cond.wait(lock); 
     return false; 
    } 
}; 


using namespace std; 

void do_something(complex<double> * c, unsigned int max) { 
    complex<double> a(1.,0.); 
    complex<double> b(1.,0.); 
    for (unsigned int i = 0; i<max; i++) { 
    a *= polar(1.,2.*M_PI*i/max); 
    b *= polar(1.,4.*M_PI*i/max); 
    *(c)+=a+b; 
    } 
} 

bool done=false; 
void task(complex<double> * c, unsigned int max, t_barrier* start_barrier, t_barrier* end_barrier) { 
    while (!done) { 
    start_barrier->wait(); 
    do_something(c,max); 
    end_barrier->wait(); 
    } 
    cout << "task finished" << endl; 
} 

int main() { 
    t_chronometer t; 

    std::default_random_engine gen; 
    std::normal_distribution<double> dis(.0,1000.0); 

    complex<double> cpx[n_threads]; 
    for (unsigned int i=0; i < n_threads; i++) { 
    cpx[i] = complex<double>(dis(gen), dis(gen)); 
    } 

    t_barrier start_barrier (n_threads + 1); // child threads + main thread 
    t_barrier end_barrier (n_threads + 1); // child threads + main thread 

    std::thread mythread[n_threads]; 
    unsigned long int sum=0; 
    for (unsigned int i=0; i < n_threads; i++) { 
    unsigned int max = task_length + i * task_length_variation; 
    cout << i+1 << "th task length: " << max << endl; 
    mythread[i] = std::thread(task, &cpx[i], max, &start_barrier, &end_barrier); 
    sum+=max; 
    } 
    cout << "total task length " << sum << endl; 

    complex<double> c(0,0); 
    for (unsigned long int j=1; j < rep+1; j++) { 
    start_barrier.wait(); //give to the threads the missing call to start 
    if (j==rep) done=true; 
    end_barrier.wait(); //wait for the call from each tread 
    if (j%100==0) cout << "cycle: " << j << endl; 
    for (unsigned int i=0; i<n_threads; i++) { 
     c+=cpx[i]; 
    } 
    } 
    for (unsigned int i=0; i < n_threads; i++) { 
    mythread[i].join(); 
    } 
    cout << "result: " << c << " it took: " << t.get_now() << " s." << endl; 
    return 0; 
} 
+0

私はバリアソリューションが大きなパフォーマンスを向上させないと言いたいと思います。現在の実装ではスレッドに参加して新しいスレッドを作成するために、CVで通知するのとほぼ同じ時間を使用します。 – DarioP

関連する問題