ここは私のコードですが、動作しますが、何度か繰り返しても何のエラーもなく停止します。おそらくレースやデッドロックが原因です。スレッドベクトルとキューを持つパイプラインC++
コードの目的は、コード化されたアプリケーションをモデル化することです。いくつかの偽のランダムフレームを作成した後、私のパイプラインのステージはまずフレームに型を与え、いくつかのランダムな操作でコード化します。
これを行うには、いくつかの共有キューと同時に使用される2つの異なるスレッドベクトル(各ステージに1つずつ)を使用しました。一方のスレッドがフレームをプッシュした後、もう一方のベクトルで別のスレッドによってポップされ、 "
#include <iostream>
#include <vector>
#include <algorithm>
#include "SafeQueue.h"
using namespace std;
const int DATA_MAG = 256;
struct Frame
{
int num;
char type;
bool encoded;
vector<vector<int>> grid;
};
void do_join(thread& t)
{
t.join();
}
void join_all(vector<thread>& v)
{
for_each(v.begin(),v.end(),do_join);
}
void create_input (Queue<Frame>& stream, int num_frames, int height, int width)
{
for (int i = 0; i < num_frames; i++)
{
vector<vector<int>>tmp_grid(height, vector<int>(width, 0));
Frame frame;
for (int j = 0; j < height; j++)
{
for (int k = 0; k < width; k++)
{
tmp_grid[j][k] = rand()%DATA_MAG;
}
}
frame.grid = tmp_grid;
frame.num = i;
stream.push(frame);
}
}
void decide_type(int preset, Queue<Frame>& stream, Queue<Frame>& typed, vector<char>& param, int num_frames)
{
cout<<"hello from decide"<<" "<<endl;
for(int i = 0; i < num_frames; i++)
{
Frame tmp = stream.pop();
int j = rand() % 10;
if(j < preset)
{
tmp.type = 'I';
}
else
{
tmp.type = 'B';
}
param[tmp.num] = tmp.type;
typed.push(tmp);
}
}
void decode_flow(int preset, Queue<Frame>& typed, vector<Frame>& encoded,
vector<char>& parameters, int num_frames, int height, int width)
{
cout<<"hello from decode"<<" "<<endl;
for(int i = 0; i < num_frames; i++)
{
Frame f = typed.pop();
if (f.type == 'I')
{
cout<<"hi from I"<<" "<<endl;
for (int j = 0; j < height; j++)
{
for (int k = 0; k < width; k++)
{
f.grid[j][k] = f.grid[j][k] * 2;
}
}
}
else cout<<"hi from B"<<" "<<endl;
encoded.push_back(f);
}
}
int main()
{
srand(time(NULL));
int num_threadsXstage = 2;
int width = 500;
int height = 500;
int num_frames = 100;
int frames_thread = num_frames/num_threadsXstage;
int preset = 3;
vector<Frame> final;
//Vectors of threads
vector<thread> typer;
vector<thread> encoder;
//Vector of parameters
vector<char> parameters(num_frames);
//Working queues
Queue<Frame> created;
Queue<Frame> typed;
//Final vector
vector<Frame> encoded(num_frames);
//Movie creation
create_input(created, num_frames, height, width);
for (int i = 0; i < num_threadsXstage; i++)
{
//stage 1
typer.push_back(thread(bind(&decide_type, preset, ref(created),
ref(typed), ref(parameters), frames_thread)));
//stage 2
encoder.push_back(thread(bind(&decode_flow, preset, ref(typed), ref(encoded),
ref(parameters), frames_thread, height, width)));
}
// JOIN
join_all(typer);
join_all(encoder);
for (int i = 0; i < num_frames; i++)
{
Frame k = typed.pop();
cout<<k.type<<" ";
}
cout<<endl<<endl;
for (int i = 0; i < num_frames; i++)
{
cout<<parameters[i]<<" ";
}
}
これは私のスレッドセーフなキューのコードです。少なくとも、それは想定されています。すべてのスレッドが終了した後
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
using namespace std;
template <typename T>
class Queue
{
private:
queue<T> queue_;
mutex mutex_;
condition_variable cond_;
public:
T pop()
{
unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
auto val = queue_.front();
queue_.pop();
return val;
}
void pop(T& item)
{
unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
item = queue_.front();
queue_.pop();
}
void push(const T& item)
{
unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
mlock.unlock();
cond_.notify_one();
}
Queue()=default;
Queue(const Queue&) = delete; // disable copying
Queue& operator=(const Queue&) = delete; // disable assignment
};
あなたの問題とは無関係ですが、スレッドを作成するときに 'std :: bind'は必要ありません。例: 'thread(&decide_type、preset、ref(created)、ref(typed)、ref(parameters)、frames_thread)'は正常に動作します。 –