2016-05-09 2 views
0

ここは私のコードですが、動作しますが、何度か繰り返しても何のエラーもなく停止します。おそらくレースやデッドロックが原因です。スレッドベクトルとキューを持つパイプラインC++

コードの目的は、コード化されたアプリケーションをモデル化することです。いくつかの偽のランダムフレームを作成した後、私のパイプラインのステージはまずフレームに型を与え、いくつかのランダムな操作でコード化します。

これを行うには、いくつかの共有キューと同時に使用される2つの異なるスレッドベクトル(各ステージに1つずつ)を使用しました。一方のスレッドがフレームをプッシュした後、もう一方のベクトルで別のスレッドによってポップされ、 "

#include <iostream> 
#include <vector> 
#include <algorithm> 

#include "SafeQueue.h" 

using namespace std; 

const int DATA_MAG = 256; 

struct Frame 
{ 
    int num; 

    char type; 

    bool encoded; 

    vector<vector<int>> grid; 
}; 

void do_join(thread& t) 
{ 
    t.join(); 
} 

void join_all(vector<thread>& v) 
{ 
    for_each(v.begin(),v.end(),do_join); 
} 

void create_input (Queue<Frame>& stream, int num_frames, int height, int width) 
{ 
    for (int i = 0; i < num_frames; i++) 
    { 
     vector<vector<int>>tmp_grid(height, vector<int>(width, 0)); 

     Frame frame; 

     for (int j = 0; j < height; j++) 
     { 
      for (int k = 0; k < width; k++) 
      { 
       tmp_grid[j][k] = rand()%DATA_MAG; 
      } 
     } 

     frame.grid = tmp_grid; 
     frame.num = i; 

     stream.push(frame); 
    } 
} 


void decide_type(int preset, Queue<Frame>& stream, Queue<Frame>& typed, vector<char>& param, int num_frames) 
{ 
    cout<<"hello from decide"<<" "<<endl; 

    for(int i = 0; i < num_frames; i++) 
    { 
     Frame tmp = stream.pop(); 

     int j = rand() % 10; 

     if(j < preset) 
     { 
      tmp.type = 'I'; 
     } 

     else 
     { 
      tmp.type = 'B'; 
     } 

     param[tmp.num] = tmp.type; 

     typed.push(tmp); 
    } 
} 

void decode_flow(int preset, Queue<Frame>& typed, vector<Frame>& encoded, 
        vector<char>& parameters, int num_frames, int height, int width) 
{ 
    cout<<"hello from decode"<<" "<<endl; 

    for(int i = 0; i < num_frames; i++) 
    { 
     Frame f = typed.pop(); 

     if (f.type == 'I') 
     { 
      cout<<"hi from I"<<" "<<endl; 
      for (int j = 0; j < height; j++) 
      { 
       for (int k = 0; k < width; k++) 
       { 
        f.grid[j][k] = f.grid[j][k] * 2; 
       } 
      } 
     } 

     else cout<<"hi from B"<<" "<<endl; 

     encoded.push_back(f); 
    } 
} 




int main() 
{ 
    srand(time(NULL)); 

    int num_threadsXstage = 2; 

    int width = 500; 
    int height = 500; 

    int num_frames = 100; 

    int frames_thread = num_frames/num_threadsXstage; 

    int preset = 3; 

    vector<Frame> final; 

    //Vectors of threads 
    vector<thread> typer; 
    vector<thread> encoder; 

    //Vector of parameters 
    vector<char> parameters(num_frames); 

    //Working queues 
    Queue<Frame> created; 
    Queue<Frame> typed; 

    //Final vector 
    vector<Frame> encoded(num_frames); 

    //Movie creation 

    create_input(created, num_frames, height, width); 



for (int i = 0; i < num_threadsXstage; i++) 
    { 
     //stage 1 
     typer.push_back(thread(bind(&decide_type, preset, ref(created), 
            ref(typed), ref(parameters), frames_thread))); 

     //stage 2 
     encoder.push_back(thread(bind(&decode_flow, preset, ref(typed), ref(encoded), 
             ref(parameters), frames_thread, height, width))); 
    } 


    // JOIN 

    join_all(typer); 

    join_all(encoder); 


    for (int i = 0; i < num_frames; i++) 
    { 
     Frame k = typed.pop(); 

     cout<<k.type<<" "; 
    } 

    cout<<endl<<endl; 

    for (int i = 0; i < num_frames; i++) 
    { 
     cout<<parameters[i]<<" "; 
    } 
} 

これは私のスレッドセーフなキューのコードです。少なくとも、それは想定されています。すべてのスレッドが終了した後

#include <queue> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 
#include <iostream> 

using namespace std; 

template <typename T> 

class Queue 
{ 
private: 
    queue<T> queue_; 
    mutex mutex_; 
    condition_variable cond_; 

public: 

    T pop() 
    { 
     unique_lock<std::mutex> mlock(mutex_); 
     while (queue_.empty()) 
     { 
      cond_.wait(mlock); 
     } 

     auto val = queue_.front(); 
     queue_.pop(); 
     return val; 
    } 

    void pop(T& item) 
    { 
     unique_lock<std::mutex> mlock(mutex_); 

     while (queue_.empty()) 
     { 
      cond_.wait(mlock); 
     } 
     item = queue_.front(); 
     queue_.pop(); 
    } 

    void push(const T& item) 
    { 
     unique_lock<std::mutex> mlock(mutex_); 
     queue_.push(item); 
     mlock.unlock(); 
     cond_.notify_one(); 
    } 
    Queue()=default; 
    Queue(const Queue&) = delete;   // disable copying 
    Queue& operator=(const Queue&) = delete; // disable assignment 

}; 
+1

あなたの問題とは無関係ですが、スレッドを作成するときに 'std :: bind'は必要ありません。例: 'thread(&decide_type、preset、ref(created)、ref(typed)、ref(parameters)、frames_thread)'は正常に動作します。 –

答えて

2

、あなたはtypedキューからすべてのキューに入れられたフレームを抽出 - が、これは処理段の間の中間キューで、今は空です。 typed.pop()への呼び出しは永遠にブロックされます。

出力キューencodedからフレームを抽出する必要があります。

+0

正確に。 4つのスレッドが終了し、メインスレッドの作業からの結合がいくつかのcout文で簡単に確認でき、デバッグが有効でコンパイルされ、gdbで実行されていることを確認したら、簡単に確認できます(.pop() ctrl-C。 –

関連する問題