2017-02-28 18 views
1

私はC++のコールバック機能を頭に入れようとしています。私が達成しようとしているのは、次のとおりです。C++ lambdaコールバックでイベントをトリガーする

私は2つのオブジェクトを持ち、それぞれに独自のスレッドがあります。 1つのオブジェクトAには、2番目のオブジェクトBへのポインタがあります。例を参照してください:私が達成しようとしています何

class A 
{ 
    public: 
    // ... 
    private: 
    std::unique_ptr<B> b; 
}; 

class B 
{ 
    public: 
    void add_message(MessageType msg); 
    // ... 
}; 

は、オブジェクトABへのポインタを使用してメッセージを追加し、他のものをやって続けたが、コールバックまたはハンドラまたはBは、Aがあるときにトリガされます何かを持っていますそのメッセージに返信してください。 Bはメッセージで何らかの処理を行い、それを他のオブジェクトに渡して独自のスレッドで処理することができますが、最終的には返信があります。 Bは、例えば私のメッセージへの返信を、持っているときにどのように私が知っていることができます。

// In class A 
MessageType m(); 
b->add_message(m) 
// A's thread continues doing other stuff 
... 
// some notification that b has a reply? 

私は私が使用したいコールバックのためのstd ::機能を使用する必要があるかもしれませんが、私は得ることができません知っていますすでに多くの例を見て、これをどうやって正確に行うのか、私の頭の中で。どんな助けもありがとうございます。私はたくさんの例を見てきましたが、私が達成しようとしていること、または理解していないものに戻すことはできません...

+1

おそらく、スレッドがお互いにアクセスできるようにしたくないでしょう。このようなことを処理する通常の方法は、メッセージキューです。 1つのスレッドはメッセージを入れ、もう1つのスレッドはメッセージを出します。キューは同期の詳細を処理できます。 ZeroMQは、このタイプのもののための人気のあるライブラリであるようです。 – vincent

+0

@Vincent私はすでに 'B'クラスのキューを使用しています。 'B'は待ち行列を所有し、' A'は 'add_message(MessageType msg);'メソッドを通じて待ち行列にメッセージを追加します。私が失われたところでは、「A」は返信を期待しています。だから、どういう時にそれがわかるのですか? –

+0

スレッドが互いのメモリを破壊するのを防ぐには、ミューテックスまたはその他の同期メカニズムが必要です。同じメモリ(変数)を同期せずに異なるスレッドから読み書きすることはできません。 –

答えて

3

スレッドは実行シーケンスです。それらは、リニアなC++プログラムとほぼ同じように動作し、メモリモデル内に埋め込まれているため、他の実行スレッドによって発生した状態変化を通知したり通知したりすることができます。

スレッドへのコールバックは、スレッドからの協力なしには実行順序を引き継ぐことはできません。通知したいスレッドは、メッセージが到着して処理されているかどうかを明示的にチェックする必要があります。


メッセージへの応答を処理する一般的な方法は2つあります。

最初はstd::futureのような方法です。その中で、呼び出し側はある種のトークンを取得し、そのトークンは将来生成される可能性のある、または将来生成される応答を表します。

もう1つは、もう一度メッセージングを使用することです。応答を要求するメッセージをBに送信します。 Bは応答を含むメッセージをAに送り返す。 Bがメッセージを受信するのと同じ方法で、Aはメッセージを受信します。メッセージには、元のメッセージにリンクするための何らかの種類の「返品対象」が含まれている場合があります。

メッセージベースのシステムでは、一般的に「イベントループ」があります。大規模でリニアなプログラムの代わりに、スレッドは「イベントループ」に戻ります。そこではメッセージのキューをチェックし、メッセージがない場合はメッセージを待ちます。

このようなシステムでは、タスクをバイトサイズのチャンクに分割する必要があります。そのため、頻繁に応答するイベントループを確認する必要があります。

これを行う1つの方法は、独自のエグゼキュータを持たない実行状態(両方を所有するスレッドなど)であるコルーチンを使用することです。コルーチンは定期的に優先順位をあきらめ、「後で状態を保存する」。


今後の解決策が最も簡単な場合もありますが、定期的に応答を確認しています。

生産者と消費者の任意の数がキューに物事を渡し、フロントをそれらをオフに食べることができますまず、threaded_queue<T>、:

template<class T> 
struct threaded_queue { 
    using lock = std::unique_lock<std::mutex>; 
    void push_back(T t) { 
    { 
     lock l(m); 
     data.push_back(std::move(t)); 
    } 
    cv.notify_one(); 
    } 
    boost::optional<T> pop_front() { 
    lock l(m); 
    cv.wait(l, [this]{ return abort || !data.empty(); }); 
    if (abort) return {}; 
    auto r = std::move(data.back()); 
    data.pop_back(); 
    return std::move(r); 
    } 
    void terminate() { 
    { 
     lock l(m); 
     abort = true; 
     data.clear(); 
    } 
    cv.notify_all(); 
    } 
    ~threaded_queue() 
    { 
    terminate(); 
    } 
private: 
    std::mutex m; 
    std::deque<T> data; 
    std::condition_variable cv; 
    bool abort = false; 
}; 

、我々はそのようなキューにタスクを渡したい、と持っていますその結果を得るためにタスクに合格した人。ここでは、パッケージタスクと上記の使用である:

template<class...Args> 
struct threaded_task_queue { 
    threaded_task_queue() = default; 
    threaded_task_queue(threaded_task_queue&&) = delete; 
    threaded_task_queue& operator=(threaded_task_queue&&) = delete; 
    ~threaded_task_queue() = default; 
    template<class F, class R=std::result_of_t<F&(Args...)>> 
    std::future<R> queue_task(F task) { 
    std::packaged_task<R(Args...)> p(std::move(task)); 
    auto r = p.get_future(); 
    tasks.push_back(std::move(p)); 
    return r; 
    } 
    void terminate() { 
    tasks.terminate(); 
    } 
    std::function<void(Args...)> pop_task() { 
    auto task = tasks.pop_front(); 
    if (!task) return {}; 
    auto task_ptr = std::make_shared<std::packaged_task<R(Args...)>>(std::move(*task)); 
    return [task_ptr](Args...args){ 
     (*task_ptr)(std::forward<Args>(args)...); 
    }; 
    } 
private: 
    threaded_queue<std::packaged_task<void(Args...)>> tasks; 
}; 

私はそれが右やった場合、それは次のように動作します。

  • Aは、キューラムダの形でBにタスクを送信します。このラムダは、(Bによって提供される)いくつかの固定セットの引数をとり、何らかの値を返します。

  • Bは、キューをポップし、引数をとるstd::functionを取得します。それはそれを呼び出す。 Bの文脈ではvoidを返します。

  • Aは、タスクをキューに入れたときにfuture<R>と指定されました。それが完了したかどうかを調べるためにこれを問い合わせることができます。

あなたは、「完了」したことを「通知」することはできません。それには別の解決策が必要です。しかし、AがBの結果を待つことなく進歩できないようになると、このシステムは機能します。

一方、Aがそのようなメッセージを大量に蓄積し、そのようなBから多くの人がデータを返すまで(またはユーザーが何かを行うまで)入力を待つ必要がある場合は、 a std::future<R>。一般的なパターン(将来の計算を表すトークンを持つ)は固いものです。しかし、将来の計算やメッセージループなどの複数のソースでうまくいくようにする必要があります。

コードはテストされていません。

threaded_task_queueための一つのアプローチにあなたがメッセージを送信している次のとおりです。

template<class Signature> 
struct message_queue; 
template<class R, class...Args> 
struct message_queue<R(Args...) : 
    threaded_task_queue< std::function<R(Args...)> > 
{ 
    std::future<R> queue_message(Args...args) { 
    return this->queue_task(
     [tup = std::make_tuple(std::forward<Args>(args)...)] 
     (std::function<R(Args...)> f) mutable 
     { 
     return std::apply(f, std::move(tup)); 
     } 
    ); 
    } 
    bool consume_message(std::function<R(Args...)> f) 
    { 
    auto task = pop_task(); 
    if (!task) return false; 
    task(std::move(f)); 
    return true; 
    } 
}; 
プロバイダ側で、あなたは Args...を提供し、消費者の側にあなたが Args...を消費し、 Rを返す

、およびプロバイダ側で、あなたはコンシューマーが完了したら結果を得るためにfuture<R>があります。

これは、私が書いた生のthreaded_task_queueよりも自然かもしれません。

std::applyはC++ 17ですが、C++ 11およびC++ 14用のワイルドで実装されています。

+0

'threaded_queue :: push_back'関数内の 'cv.notify_one()'を他のコードブロックから守る理由は何ですか? – Steephen

+1

@Steephen 'notify_one'では、mutexを保持する必要はありません。だから私はしなかった。いくつかの実装では、リスニングスレッドはすぐにmutexを取得する際にnotify thenブロックによって目覚めされないので、助けになるかもしれません。 – Yakk

+0

はそれを得ました。説明をお寄せいただきありがとうございます。 – Steephen

関連する問題