2009-09-19 16 views
7

あるスレッド(A)から別のスレッド(B)にメッセージを渡すためのキューが必要ですが、一般に許可しているので、メッセージを処理する必要があるので私の状況ではかなり致命的なケース、スレッドは本当に停止してスペアルームを待つ。マルチスレッドシングルリーダシングルライタFIFOキュー

  • はAだけのアイテムを追加し、それが
  • 項目を追加することができるようにのみ、スレッドBがそれらを読み込み
  • スレッドAが、しかし、スレッドBがパフォーマンス重要ではなく、ブロックしてはいけません必ず成功しなければならないので、キュースレッド(システム上のメモリ不足の短い)サイズの上限
  • を持ってカントキューが空の場合はアイテムがあるまで、スレッドBが
+0

どのスレッドライブラリを使用していますか? pthreads? –

+0

boost :: threadといくつかのビットのプラットフォーム固有のコードがここにあります –

+0

あなたの目標は、ライタースレッドがアイテムをブロックまたはドロップすることを許可しないため、メモリが不足する可能性があります。したがって、キューの重要なサイズ制限に達すると、アイテムをドロップするか、ライタースレッドをブロックするかを決定する必要があります。さもなければ、プログラムが失敗して間接的にアイテムをドロップします:-) – mmmmmmmm

答えて

7

を処理するのを待つ必要がありますここではロックインを作成する方法ですC++の空きキュー:

http://www.ddj.com/hpc-high-performance-computing/210604448

しかし、あなたはそれが必要条件だと確信している、「スレッドAがブロックしてはならない」と言うとき? Windowsはリアルタイムオペレーティングシステムではありません(通常はLinuxでもありません)。スレッドAが使用可能なすべてのシステムメモリを使用できるようにするには、メモリを割り当てる必要があります(または、他の誰かが実行している間待つ必要があります)。 OS自体は、リストを操作するためにリーダとライターの両方がインプロセスロック(すなわち、非共有ミューテックス)を取った場合のものよりも優れたタイミング保証を提供することはできません。そしてメッセージを追加する最悪の場合は、メモリを取得するためにOSに行く必要があります。

要するに、あなたが好きでないキューには容量が固定されています。つまり、待ち時間の短いスレッドにメモリを割り当てる必要がないからです。

したがって、ロックフリーコードは一般的にブロック-yより少なくなりますが、メモリ割り当てのために保証されているわけではありません。ミューテックスのパフォーマンスは、本当に巨大処理するイベントのストリーム(ネットワークドライバを作成していて、メッセージは着信イーサネットパケットです)。

ので、擬似コードでは、私が試してみた最初のものは、次のようになります。私はロックフリーのためのコードを行くだろう、これはライターのスレッドで容認できない遅延を導入することを証明する場合のみ

Writer: 
    allocate message and fill it in 
    acquire lock 
     append node to intrusive list 
     signal condition variable 
    release lock 

Reader: 
    for(;;) 
     acquire lock 
      for(;;) 
       if there's a node 
        remove it 
        break 
       else 
        wait on condition variable 
       endif 
      endfor 
     release lock 
     process message 
     free message 
    endfor 

を、(私がうまくいけば適切なキューが既に存在していない限り)。

+0

より低いレベルでは、書き込みプロセスが追加され、読み取りプロセスが消費されるシングルリンクリストを使用できます。これは、書き込みプロセスがNULLポインタを非NULLに変更し、読み取りプロセスが非NULLをNULLに変更することで、ロックフリーにすることができます。小さなプライベートヒープは、リスト項目に対して良好な償却されたパフォーマンスを提供します。作家のmallocsと読者が解放されます。読者がスリープ状態になると、プロセスAから割り当てのブロック性質が隠れて、秘密ヒープを投機的に拡大する第3のプロセスCを提供することができます。 –

+0

あなたの例はデッドロックになります。読者はcondを待っている間、ロックを保持しています。これにより、ライターはロックとシグナリングを取得できません。条件変数を待って直ちに再度取得する前に、ロックを解除する必要があります。 –

+0

@Bobby:あなたは間違っています。条件変数で待機すると、待機中に関連するロックが解除され、待機から復帰する前に再取得されます。これは、「条件変数」が意味するものの一部です。使用しているAPIがそれを実行していない場合は、条件変数ではなく、セマフォに似ています。そして、APIがそれをすることは重要です。なぜなら、あなたのコードは、ロックの解放と状態の待機を開始することが原子的に起こるという事実に依存することができます - つまり、スレッドの前にロックの下で他のスレッドは何もできませんウェイター。 –

0

あなたの要件を検討したいかもしれません - 本当にAはキュー項目をすべて破棄できないのでしょうか?または、Bが連続した2つの要素をキューから取り出して、連続した項目ではないことが原因でイベントのシーケンスが何らかの形で誤って表示されることを避けたいのですか?

たとえば、これが何らかのデータロギングシステムであれば、レコードにギャップがないように思われますが、無制限のメモリがなければ、現実的には、あなたのキューの容量..

この場合、1つの解決策は、キューに置くことができる何らかの種類の特別な要素を持つことです。これは、項目を削除する必要があることを発見したAのケースを表します。基本的に1つの余分な要素を保持していますが、これはほとんどの場合nullです。 Aがキューに要素を追加するたびに、この余分な要素がヌルでない場合、それは入ります。Aがキューに空きがないことを検出した場合、この余分な要素を設定して ' 。

このように、Aはブロックしません。システムが非常にビジー状態のときに要素を削除できますが、要素が削除されたという事実を失うことはありません。キュースペースが利用可能になるとすぐにデータの欠落が発生した場所を示します。それから、プロセスBは、このオーバーランマーク要素をキューから抜き取ったことを発見したときに必要な処理を行います。

1

Visual Studio 2010では、このシナリオを非常にうまくサポートする2つの新しいライブラリ、Asynchronous Agents LibraryとParallel Pattern Libraryが追加されています。

エージェントライブラリがサポートしているか、非同期メッセージパッシングと「目標」にメッセージを送信するためと「ソース」

からunbounded_bufferを、メッセージを受信するためのメッセージ・ブロックが含まれている私は、あなたが探していると考えているものを提供していますテンプレートクラスであります用:

ミューテックスARでSTL < list>または< deque>を使用しないのはなぜ
#include <agents.h> 
#include <ppl.h> 
#include <iostream> 

using namespace ::Concurrency; 
using namespace ::std; 

int main() 
{ 
    //to hold our messages, the buffer is unbounded... 
    unbounded_buffer<int> buf1; 
    task_group tasks; 

    //thread 1 sends messages to the unbounded_buffer 
    //without blocking 
    tasks.run([&buf1](){ 
     for(int i = 0 ; i < 10000; ++i) 
     send(&buf1,i) 
    //signal exit 
    send(&buf1,-1); 
    }); 

    //thread 2 receives messages and blocks if there are none 

    tasks.run([&buf1](){ 
     int result; 
     while(result = receive(&buf1)!=-1) 
     { 
      cout << "I got a " << result << endl; 
     } 
    }); 

    //wait for the threads to end 
    tasks.wait(); 
} 
+2

これは本当にLinuxカテゴリで動作しますか? –

+0

FWIW、あなたの受信ループでは、!=が評価されるので、常に "I got a" –

1
  • 追加/削除? thread-safety of STLは不十分ですか?

  • ポインタを含む独自の(単体/二重の)リンクリストノードクラスを作成し、そこから追加/削除するアイテムを継承するのはなぜですか?したがって、追加の割り当てを不要にする。 threadA::add()threadB::remove()にいくつかのポインタを入れてみると完了です。あなたはpthreadsを使用している場合

  • (あなたは...あなたは本当に間違って何かをした場合を除き、ミューテックスの下で、スレッドA上のブロッキング効果は無視できるだろうとやってみたいと思いますが)、sem_post()sem_wait()をチェックしてください。 ThreadBが経由で無期限にブロックできるのは、threadAが何かをキューに入れるまでです。その後、threadAはsem_post()を呼び出します。それがthreadBを目覚めさせて仕事にします。その後、threadBはスリープ状態に戻ることができます。これは、threadA::add()threadB::remove()が完了する前に複数のようなものをサポートする非同期シグナリングを効率的に処理する方法です。

関連する問題