2011-01-31 11 views
43

TThreadedQueue(Generics.Collections)を単一プロデューサの複数コンシューマスキームで使用しようとしています。 (Delphi-XE)である。 アイデアは、オブジェクトをキューにプッシュし、複数のワーカースレッドにキューを排水させることです。TThreadedQueueは複数のコンシューマーに対応していませんか?

ただし、期待どおりに動作しません。 2つ以上のワーカースレッドがPopItemを呼び出すと、TThreadedQueueからアクセス違反がスローされます。

PopItemの呼び出しがクリティカルセクションでシリアル化されている場合、すべて正常です。

確かにTThreadedQueueは複数のコンシューマを処理できなければならないので、何か不足していますか、これはTThreadedQueueの純粋なバグですか?

ここでは、エラーを生成する簡単な例を示します。

program TestThreadedQueue; 

{$APPTYPE CONSOLE} 

uses 
// FastMM4 in '..\..\..\FastMM4\FastMM4.pas', 
    Windows, 
    Messages, 
    Classes, 
    SysUtils, 
    SyncObjs, 
    Generics.Collections; 

type TThreadTaskMsg = 
     class(TObject) 
     private 
      threadID : integer; 
      threadMsg : string; 
     public 
      Constructor Create(ID : integer; const msg : string); 
     end; 

type TThreadReader = 
     class(TThread) 
     private 
      fPopQueue : TThreadedQueue<TObject>; 
      fSync  : TCriticalSection; 
      fMsg  : TThreadTaskMsg; 
      fException : Exception; 
      procedure DoSync; 
      procedure DoHandleException; 
     public 
      Constructor Create(popQueue : TThreadedQueue<TObject>; 
           sync  : TCriticalSection); 
      procedure Execute; override; 
     end; 

Constructor TThreadReader.Create(popQueue : TThreadedQueue<TObject>; 
            sync  : TCriticalSection); 
begin 
    fPopQueue:=   popQueue; 
    fMsg:=     nil; 
    fSync:=    sync; 
    Self.FreeOnTerminate:= FALSE; 
    fException:=   nil; 

    Inherited Create(FALSE); 
end; 

procedure TThreadReader.DoSync ; 
begin 
    WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId)); 
end; 

procedure TThreadReader.DoHandleException; 
begin 
    WriteLn('Exception ->' + fException.Message); 
end; 

procedure TThreadReader.Execute; 
var signal : TWaitResult; 
begin 
    NameThreadForDebugging('QueuePop worker'); 
    while not Terminated do 
    begin 
    try 
     {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. } 
     Sleep(20); 
     {- Serializing calls to PopItem works } 
     if Assigned(fSync) then fSync.Enter; 
     try 
     signal:= fPopQueue.PopItem(TObject(fMsg)); 
     finally 
     if Assigned(fSync) then fSync.Release; 
     end; 
     if (signal = wrSignaled) then 
     begin 
     try 
      if Assigned(fMsg) then 
      begin 
      fMsg.threadMsg:= '<Thread id :' +IntToStr(Self.threadId) + '>'; 
      fMsg.Free; // We are just dumping the message in this test 
      //Synchronize(Self.DoSync); 
      //PostMessage(fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0); 
      end; 
     except 
      on E:Exception do begin 
      end; 
     end; 
     end; 
     except 
     FException:= Exception(ExceptObject); 
     try 
     if not (FException is EAbort) then 
     begin 
      {Synchronize(} DoHandleException; //); 
     end; 
     finally 
     FException:= nil; 
     end; 
    end; 
    end; 
end; 

Constructor TThreadTaskMsg.Create(ID : Integer; Const msg : string); 
begin 
    Inherited Create; 

    threadID:= ID; 
    threadMsg:= msg; 
end; 

var 
    fSync : TCriticalSection; 
    fThreadQueue : TThreadedQueue<TObject>; 
    fReaderArr : array[1..4] of TThreadReader; 
    i : integer; 

begin 
    try 
    IsMultiThread:= TRUE; 

    fSync:=  TCriticalSection.Create; 
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100); 
    try 
     {- Calling without fSync throws exceptions when two or more threads calls PopItem 
     at the same time } 
     WriteLn('Creating worker threads ...'); 
     for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create(fThreadQueue,Nil); 
     {- Calling with fSync works ! } 
     //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create(fThreadQueue,fSync); 
     WriteLn('Init done. Pushing items ...'); 

     for i:= 1 to 100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,'')); 

     ReadLn; 

    finally 
     for i:= 1 to 4 do fReaderArr[i].Free; 
     fThreadQueue.Free; 
     fSync.Free; 
    end; 

    except 
    on E: Exception do 
     begin 
     Writeln(E.ClassName, ': ', E.Message); 
     ReadLn; 
     end; 
    end; 
end. 

更新:TThreadedQueueは、Delphi XE2に固定されてクラッシュさせTMonitorでエラーが発生しました。

更新2:上記のテストでは、キューが空の状態であることを示しています。 Darian Miller氏は、キューをフル状態にしてもXE2のエラーを再現できることを発見しました。エラーはもう一度TMonitorにあります。詳細は下記の彼の答えを見てください。また、QC101114へのリンクもあります。

アップデート3: デルファイXE2更新4でTThreadedQueueの問題を治すでしょうTMonitorのために発表の修正がありました。これまでのテストでは、TThreadedQueueのエラーは再現できません。 キューが空でいっぱいの場合、単一のプロデューサ/複数のコンシューマスレッドをテストしました。 複数のプロデューサ/複数のコンシューマもテスト済みです。私は読者スレッドとライタースレッドを1から100に変えました。しかし、歴史を知って、私はTMonitorを壊すために他人を挑戦する。

+4

こんにちは! StackOverflowへようこそ。これは良い質問ですが、コードが少し違って投稿されたかどうかをテストする方が簡単かもしれません。対応するDFMなしで.pasの半分のフォームを追加しました。これは、私たちが複製して調査することを困難にしています。問題はUI関連のようではないので、これをコンソールアプリケーションに減らす方法はありますか?ありがとう。 –

+0

メイソン、コンソールアプリが終了しました。 –

+1

XE2にまだ問題があります。 –

答えて

19

TThreadedQueueまたはTMonitorのいずれかのバグであることは確かです。いずれにしても、それはRTLにあり、あなたのコードではありません。あなたはこれをQCレポートとして提出し、上記の例を "再現方法"のコードとして使用する必要があります。

+0

メイソン、ありがとう。私は誰か他の人が違う意見を持っていなければ明日QCします。 TMonitorにエラーがあるようです。 –

+7

QC#91246複数のコンシューマでTThreadedQueueが失敗します。あなたが好きなら、それに投票してください。 –

+5

QCReportへのリンク:[http://qc.embarcadero.com/wc/qcmain.aspx?d=91246] (http://qc.embarcadero.com/wc/qcmain.aspx?d=91246) – jachguate

10

スレッド、並列処理などの作業には、OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibraryを使用することをお勧めします。Primozは非常にうまく機能しており、多くの便利なドキュメントがあります。

+1

私はOmniThreadLibraryとAsyncCallsをAndreas Hausladenhttp://andy.jgknet.de/blog/bugfix-units/asynccalls-29-asynchronous-function-calls/で知っています。 –

1

TThreadedQueueは複数のコンシューマをサポートするとは思われません。これは、ヘルプファイルごとに、FIFOです。私は、1つのスレッドがプッシュし、別のスレッド(ただ1つ!)がポップしているという印象を受けています。

+8

FIFOはキューがどのように空になっているかを示す単なる方法です。キューからジョブを引き出すスレッドが1つしかないことを意味するものではありません。特に* ThreadedQueue *と呼ばれるときはそうではありません。 –

+2

プッシャーとポッパーが異なるスレッドに存在する可能性があるため、これはThreadedQueueと呼ばれます。マルチスレッドの世界では何も無料で提供されるわけではないので、ドキュメントが利用可能であれば、複数のプロデューサーや消費者サポートを述べていると思います。それは言及されていないので、私はそれが動作するはずではないと思う。 – Giel

+3

キューはモニタによって保護されています。モニター自体は、マルチスレッド環境で安全でなければなりません。キューが複数のコンシューマにとって安全でない場合は、キャッチ可能な例外をキャストする必要があります。 –

3

私はTThreadedQueueクラスを探しましたが、私のD2009ではそれを持っていないようです。私は正確にこれ以上自分自身を殺すつもりはない - デルファイのスレッドのサポートは、常に間違っている... errm ... '最適ではない'と私はTThreadedQueueも違うと思う:)

なぜジェネリックPCプロデューサー/コンシューマー)オブジェクト?

unit MinimalSemaphorePCqueue; 

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore. 

The semaphore count reflects the queue count 
'push' will always succeed unless memory runs out, then you're stuft anyway. 
'pop' has a timeout parameter as well as the address of where any received 
object is to be put. 
'pop' returns immediately with 'true' if there is an object on the queue 
available for it. 
'pop' blocks the caller if the queue is empty and the timeout is not 0. 
'pop' returns false if the timeout is exceeded before an object is available 
from the queue. 
'pop' returns true if an object is available from the queue before the timeout 
is exceeded. 
If multiple threads have called 'pop' and are blocked because the queue is 
empty, a single 'push' will make only one of the waiting threads ready. 


Methods to push/pop from the queue 
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call. 
When the handle is signaled, the 'peek' method will retrieve the queued object. 
} 
interface 

uses 
    Windows, Messages, SysUtils, Classes,syncObjs,contnrs; 


type 

pObject=^Tobject; 


TsemaphoreMailbox=class(TobjectQueue) 
private 
    countSema:Thandle; 
protected 
    access:TcriticalSection; 
public 
    property semaHandle:Thandle read countSema; 
    constructor create; virtual; 
    procedure push(aObject:Tobject); virtual; 
    function pop(pResObject:pObject;timeout:DWORD):boolean; virtual; 
    function peek(pResObject:pObject):boolean; virtual; 
    destructor destroy; override; 
end; 


implementation 

{ TsemaphoreMailbox } 

constructor TsemaphoreMailbox.create; 
begin 
{$IFDEF D2009} 
    inherited Create; 
{$ELSE} 
    inherited create; 
{$ENDIF} 
    access:=TcriticalSection.create; 
    countSema:=createSemaphore(nil,0,maxInt,nil); 
end; 

destructor TsemaphoreMailbox.destroy; 
begin 
    access.free; 
    closeHandle(countSema); 
    inherited; 
end; 

function TsemaphoreMailbox.pop(pResObject: pObject; 
    timeout: DWORD): boolean; 
// dequeues an object, if one is available on the queue. If the queue is empty, 
// the caller is blocked until either an object is pushed on or the timeout 
// period expires 
begin // wait for a unit from the semaphore 
    result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout)); 
    if result then // if a unit was supplied before the timeout, 
    begin 
    access.acquire; 
    try 
     pResObject^:=inherited pop; // get an object from the queue 
    finally 
     access.release; 
    end; 
    end; 
end; 

procedure TsemaphoreMailbox.push(aObject: Tobject); 
// pushes an object onto the queue. If threads are waiting in a 'pop' call, 
// one of them is made ready. 
begin 
    access.acquire; 
    try 
    inherited push(aObject); // shove the object onto the queue 
    finally 
    access.release; 
    end; 
    releaseSemaphore(countSema,1,nil); // release one unit to semaphore 
end; 

function TsemaphoreMailbox.peek(pResObject: pObject): boolean; 
begin 
    access.acquire; 
    try 
    result:=(count>0); 
    if result then pResObject^:=inherited pop; // get an object from the queue 
    finally 
    access.release; 
    end; 
end; 
end. 
+0

あなたの答えに感謝します。私はXEのドキュメントでTThreadedQueueクラスを見て、私が持っていた実際のアプリケーションを簡単にテストしました。これはジェネリック医薬品の私の最初のショットであり、うまくいきませんでした。他のコメントから分かるように、バグはTMonitorクラスにあります。このバグは、誰かが並列マルチスレッドアプリケーションを構築した場合に意味を持ちます。私の実装は、押してポップするためのクリティカルセクションで保護された単純なキューを使用して終了しました。 –

4

あなたの例では、XE2の下で正常に動作するようですが、私たちはあなたのキューを埋める場合には、上のAVで失敗します。シンプルなTObjectQueueの子孫は罰金を行います - - 数十年のためにこれを使用して、複数の生産者/消費者と正常に動作しますPushItem。(あなたのキューの深さは1024に設定された)だけで100から1100にあなたのタスクの作成を増やし、再現する

を(XE2アップデート1でテスト)

for i:= 1 to 1100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,'')); 

これは私が最初にWindows 7の上で私のために毎回死にますそれをストレステストするために継続的なプッシュを試みたが、それはループ30で失敗した。ループ16で失敗した。そして、65で異なる間隔で失敗したが、ある時点で一貫して失敗した。

iLoop := 0; 
    while iLoop < 1000 do 
    begin 
    Inc(iLoop); 
    WriteLn('Loop: ' + IntToStr(iLoop)); 
    for i:= 1 to 100 do fThreadQueue.PushItem(TThreadTaskMsg.Create(i,'')); 
    end; 
+0

ああ、いや、何かの点では、キューが空のときと同じように、これもまた大きな打撃点になるかもしれない。私はSOの別の投稿でこれについてコメントしました。私の愚かなことはテストしないでください。私はそれを確認するためのいくつかのテストを行います。 –

+0

Yeppは、Windows 7の64ビット(XE2アップデート2)で、32ビットと64ビットの両方のexeで一貫して失敗します。 Q QCしますか、それともしますか? –

+0

として報告[QC101114](http://qc.embarcadero.com/wc/qcmain.aspx?d=101114) –

関連する問題