2017-12-31 266 views
1

私はスレッドプールパターンに非常にうまく適合する、同時に問題を解決しようとしています。ここで私は、最小限の代表的な例を提供しようとします:C++のキュー上のスレッドプール

たちは、このような疑似プログラムを持って言う:

Q : collection<int> 
while (!Q.empty()) { 
    for each q in Q { 
     // perform some computation 
    } 
    // assign a new value to Q 
    Q = something_completely_new(); 
} 

私はn-1労働者と一つのメインで、並列な方法でそれを実現しようとしています糸。作業者は、Qの要素をつかんで内側のループで計算を実行します。

これは、マスタースレッドがQが割り当てられていることを通知するworkという2つの条件変数を使用して解決しようとしました。もう1つはwork_doneです。それがセグメンテーションフォールトのように見えるグーグルのしばらく後

Working on 0 
Working on 1 
Working on 2 
Working on 3 
Working on 0 
Working on 1 
Working on 2 
Working on 0 
Working on 1 
Working on 0 
libc++abi.dylib: terminating 
Abort trap: 6 

:私は次の出力を得るg++ -std=c++11 -Wall -o main main.cppとOS X上でコンパイルした後、

#include <iostream> 
#include <mutex> 
#include <condition_variable> 
#include <queue> 
#include <thread> 

using namespace std; 

std::queue<int> Q; 
std::mutex mut; 
std::condition_variable work; 
std::condition_variable work_done; 

void run_thread() { 
    for (;;) { 
     std::unique_lock<std::mutex> lock(mut); 
     work.wait(lock, [&] { return Q.size() > 0; }); 

     // there is work to be done - pretend we're working on something 
     int x = Q.front(); Q.pop(); 
     std::cout << "Working on " << x << std::endl; 

     work_done.notify_one(); 
    } 
} 

int main() { 
    // your code goes here 
    std::vector<std::thread *> workers(3); 

    for (size_t i = 0; i < 3; i++) { 
     workers[i] = new std::thread{ 
      [&] { run_thread(); } 
     }; 
    } 

    for (int i = 4; i > 0; --i) { 
     std::unique_lock<std::mutex> lock(mut); 
     Q = std::queue<int>(); 
     for (int k = 0; k < i; k++) { 
      Q.push(k); 
     } 
     work.notify_all(); 
     work_done.wait(lock, [&] { return Q.size() == 0; }); 
    } 

    for (size_t i = 0; i < 3; i++) { 
     delete workers[i]; 
    } 

    return 0; 
} 

残念なことに:

はここに私のC++のコードです。おそらく私は条件変数を誤用しているのです。私はいくつかの洞察力、両方のアーキテクチャー(このタイプの問題にアプローチする方法について)と具体的に、私がここで間違っているのと同じように、感謝するでしょう。

私は

+0

なく、あなたのクラッシュの問題を終了するいくつかの方法が必要になります(run_thread 'で' Q')のあなたのアクセスは 'スレッドセーフではないことに注意してください。' std :: condition_variable :: wait() 'はロックを返します。述語を取る 'wait()'の形式のドキュメントを見ると、ロックを保持している間に述語ラムダが実行されているとは確信していません。 – marko

+0

上記のユースケースは、未来&約束(http://en.cppreference.com/w/cpp/thread/future)のための良いケースです。これは、 'work'の結果をパッキングする問題を解決し、それを待つことができます。 – marko

+0

'run_thread'の' Q'へのアクセスはスレッドセーフです。 'condition_variable :: wait'が返ってくると、mutexのロックが取得されるので、これらの行は' int x = Q.front(); Q.pop(); 'は安全です。 'wait'メソッドで述語を実行することもスレッドセーフです。ドキュメントhttp://en.cppreference.com/w/cpp/thread/condition_variable/wait、述語を含む説明バージョン'を参照してください。このメソッドに入る前に、 (ロック)終了後に再び取得されます。つまり、ロックはpred()アクセスのガードとして使用できます。したがって、ラムダの 'Q'へのアクセスは安全です。 – rafix07

答えて

2

アプリケーションがstd::terminateによって殺された。助けに感謝(各スレッドが合流可能な状態になっている)あなたのスレッド関数の

ボディは無限ループであるので、これらの線が

for (size_t i = 0; i < 3; i++) { 
    delete workers[i]; 
} 

を実行しているとき、あなたはまだ実行中のスレッドを削除したいです。破壊されたときにスレッドが合流可能である場合は、(http://www.cplusplus.com/reference/thread/thread/~thread/から)

を、以下の事が起こる合流可能な状態にあるスレッドのデストラクタを呼び出すと、terminate()が呼び出されます。

あなたはterminateが呼び出されないようにしたい場合はそう、あなたがスレッドを作成した後detach()メソッドを呼び出す必要があります。

for (size_t i = 0; i < 3; i++) { 
    workers[i] = new std::thread{ 
     [&] { run_thread(); } 
    }; 
    workers[i]->detach(); // <--- 
} 
+0

スレッドをデタッチすると、 'std :: thread'がそのスレッドを破棄してリークします。 'std :: thread'を削除する前にそれらに参加すると' main() 'は永遠に待機します(競合することなく実際にpthreadを削除するのは良い考えです)。 より良い解決策は、各スレッドが終了条件として使用できるフラグを持つことです。detach()を呼び出します。 – marko

2

キューが空であるという理由だけで、作業が完了したわけではありません。

finished = true; 
work.notify_all(); 
for (size_t i = 0; i < 3; i++) { 
    workers[i].join(); // wait for threads to finish 
    delete workers[i]; 
} 

、我々はスレッド

for (;!finshed;) { 
    std::unique_lock<std::mutex> lock(mut); 
    work.wait(lock, [&] { return Q.size() > 0 || finished; }); 
    if (finished) 
     return; 
関連する問題