スレッドは実行シーケンスです。それらは、リニアなC++プログラムとほぼ同じように動作し、メモリモデル内に埋め込まれているため、他の実行スレッドによって発生した状態変化を通知したり通知したりすることができます。
スレッドへのコールバックは、スレッドからの協力なしには実行順序を引き継ぐことはできません。通知したいスレッドは、メッセージが到着して処理されているかどうかを明示的にチェックする必要があります。
メッセージへの応答を処理する一般的な方法は2つあります。
最初はstd::future
のような方法です。その中で、呼び出し側はある種のトークンを取得し、そのトークンは将来生成される可能性のある、または将来生成される応答を表します。
もう1つは、もう一度メッセージングを使用することです。応答を要求するメッセージをBに送信します。 Bは応答を含むメッセージをAに送り返す。 Bがメッセージを受信するのと同じ方法で、Aはメッセージを受信します。メッセージには、元のメッセージにリンクするための何らかの種類の「返品対象」が含まれている場合があります。
メッセージベースのシステムでは、一般的に「イベントループ」があります。大規模でリニアなプログラムの代わりに、スレッドは「イベントループ」に戻ります。そこではメッセージのキューをチェックし、メッセージがない場合はメッセージを待ちます。
このようなシステムでは、タスクをバイトサイズのチャンクに分割する必要があります。そのため、頻繁に応答するイベントループを確認する必要があります。
これを行う1つの方法は、独自のエグゼキュータを持たない実行状態(両方を所有するスレッドなど)であるコルーチンを使用することです。コルーチンは定期的に優先順位をあきらめ、「後で状態を保存する」。
今後の解決策が最も簡単な場合もありますが、定期的に応答を確認しています。
生産者と消費者の任意の数がキューに物事を渡し、フロントをそれらをオフに食べることができますまず、threaded_queue<T>
、:
今
template<class T>
struct threaded_queue {
using lock = std::unique_lock<std::mutex>;
void push_back(T t) {
{
lock l(m);
data.push_back(std::move(t));
}
cv.notify_one();
}
boost::optional<T> pop_front() {
lock l(m);
cv.wait(l, [this]{ return abort || !data.empty(); });
if (abort) return {};
auto r = std::move(data.back());
data.pop_back();
return std::move(r);
}
void terminate() {
{
lock l(m);
abort = true;
data.clear();
}
cv.notify_all();
}
~threaded_queue()
{
terminate();
}
private:
std::mutex m;
std::deque<T> data;
std::condition_variable cv;
bool abort = false;
};
、我々はそのようなキューにタスクを渡したい、と持っていますその結果を得るためにタスクに合格した人。ここでは、パッケージタスクと上記の使用である:
template<class...Args>
struct threaded_task_queue {
threaded_task_queue() = default;
threaded_task_queue(threaded_task_queue&&) = delete;
threaded_task_queue& operator=(threaded_task_queue&&) = delete;
~threaded_task_queue() = default;
template<class F, class R=std::result_of_t<F&(Args...)>>
std::future<R> queue_task(F task) {
std::packaged_task<R(Args...)> p(std::move(task));
auto r = p.get_future();
tasks.push_back(std::move(p));
return r;
}
void terminate() {
tasks.terminate();
}
std::function<void(Args...)> pop_task() {
auto task = tasks.pop_front();
if (!task) return {};
auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task));
return [task_ptr](Args...args){
(*task_ptr)(std::forward<Args>(args)...);
};
}
private:
threaded_queue<std::packaged_task<void(Args...)>> tasks;
};
私はそれが右やった場合、それは次のように動作します。
Aは、キューラムダの形でBにタスクを送信します。このラムダは、(Bによって提供される)いくつかの固定セットの引数をとり、何らかの値を返します。
Bは、キューをポップし、引数をとるstd::function
を取得します。それはそれを呼び出す。 Bの文脈ではvoid
を返します。
Aは、タスクをキューに入れたときにfuture<R>
と指定されました。それが完了したかどうかを調べるためにこれを問い合わせることができます。
あなたは、「完了」したことを「通知」することはできません。それには別の解決策が必要です。しかし、AがBの結果を待つことなく進歩できないようになると、このシステムは機能します。
一方、Aがそのようなメッセージを大量に蓄積し、そのようなBから多くの人がデータを返すまで(またはユーザーが何かを行うまで)入力を待つ必要がある場合は、 a std::future<R>
。一般的なパターン(将来の計算を表すトークンを持つ)は固いものです。しかし、将来の計算やメッセージループなどの複数のソースでうまくいくようにする必要があります。
コードはテストされていません。
threaded_task_queue
ための一つのアプローチにあなたがメッセージを送信している次のとおりです。
template<class Signature>
struct message_queue;
template<class R, class...Args>
struct message_queue<R(Args...) :
threaded_task_queue< std::function<R(Args...)> >
{
std::future<R> queue_message(Args...args) {
return this->queue_task(
[tup = std::make_tuple(std::forward<Args>(args)...)]
(std::function<R(Args...)> f) mutable
{
return std::apply(f, std::move(tup));
}
);
}
bool consume_message(std::function<R(Args...)> f)
{
auto task = pop_task();
if (!task) return false;
task(std::move(f));
return true;
}
};
プロバイダ側で、あなたは
Args...
を提供し、消費者の側にあなたが
Args...
を消費し、
R
を返す
、およびプロバイダ側で、あなたはコンシューマーが完了したら結果を得るためにfuture<R>
があります。
これは、私が書いた生のthreaded_task_queue
よりも自然かもしれません。
std::apply
はC++ 17ですが、C++ 11およびC++ 14用のワイルドで実装されています。
おそらく、スレッドがお互いにアクセスできるようにしたくないでしょう。このようなことを処理する通常の方法は、メッセージキューです。 1つのスレッドはメッセージを入れ、もう1つのスレッドはメッセージを出します。キューは同期の詳細を処理できます。 ZeroMQは、このタイプのもののための人気のあるライブラリであるようです。 – vincent
@Vincent私はすでに 'B'クラスのキューを使用しています。 'B'は待ち行列を所有し、' A'は 'add_message(MessageType msg);'メソッドを通じて待ち行列にメッセージを追加します。私が失われたところでは、「A」は返信を期待しています。だから、どういう時にそれがわかるのですか? –
スレッドが互いのメモリを破壊するのを防ぐには、ミューテックスまたはその他の同期メカニズムが必要です。同じメモリ(変数)を同期せずに異なるスレッドから読み書きすることはできません。 –