プロデューサ独自のSynchronizedBlockingQueueを持つ異なるスレッド。 各プロデューサはメッセージをそれ自身のキューに入れます。キューのいずれかのいずれかからのメッセージを取得し、プロセスを開始しますブローカなしでコンシューマにメッセージを渡す方法
消費者
別のスレッド。
プロデューサーとコンシューマーのコミュニケーションには、ブローカーが必要です。ボトルネックになる可能性があります。消費者がプロデューサーと開始プロセスから1つのメッセージを得る他の方法はありますか?
プロデューサ独自のSynchronizedBlockingQueueを持つ異なるスレッド。 各プロデューサはメッセージをそれ自身のキューに入れます。キューのいずれかのいずれかからのメッセージを取得し、プロセスを開始しますブローカなしでコンシューマにメッセージを渡す方法
消費者
別のスレッド。
プロデューサーとコンシューマーのコミュニケーションには、ブローカーが必要です。ボトルネックになる可能性があります。消費者がプロデューサーと開始プロセスから1つのメッセージを得る他の方法はありますか?
言語を指定していないので、私はAdaプログラミング言語を使用した一般的な例を提供すると思いました。この例では、コンシューマは単にプロデューサからメッセージを出力しますが、ここで説明したプロデューサ - コンシューマアーキテクチャが提供されます。
with Ada.Task_Identification; use Ada.Task_Identification;
package Multiple_Producer is
type Producer_Message is private;
protected type Buffer is
entry Set_Message (Item : in Producer_Message);
entry Get_Message (Item : out Producer_Message);
private
Msg : Producer_Message;
Is_New : Boolean := False;
end Buffer;
type Buf_Alias is access all Buffer;
type Buf_Array is array (Positive range <>) of aliased Buffer;
type Buf_Access is access all Buf_Array;
task type Producer is
entry Set_Buffer (Item : Buf_Alias);
entry Stop;
end Producer;
task Consumer is
entry Set_Buffers (Item : Buf_Access);
entry Stop;
end Consumer;
private
type Producer_Message is record
the_Task : Task_Id;
Value : Integer;
end record;
end Multiple_Producer;
with Ada.Text_IO; use Ada.Text_IO;
package body Multiple_Producer is
--------------
-- Producer --
--------------
task body Producer is
Message : Producer_Message := (Current_Task, 0);
The_Buf : Buf_Alias;
begin
accept Set_Buffer(Item : in Buf_Alias) do
The_Buf := Item;
end Set_Buffer;
loop
select
accept Stop;
exit;
else
delay 0.01;
The_Buf.Set_Message(Message);
Message.Value := Message.Value + 1;
end select;
end loop;
end Producer;
--------------
-- Consumer --
--------------
task body Consumer is
Message : Producer_Message;
Buffers : Buf_Access;
begin
accept Set_Buffers(Item : Buf_Access) do
Buffers := Item;
end Set_Buffers;
loop
select
accept Stop;
exit;
else
-- Poll the buffers
for I in Buffers'Range loop
select
Buffers(I).Get_Message(Message);
Put_Line(Image(Message.The_Task) & ": " &
Integer'Image(Message.Value));
or
delay 0.001;
end select;
end loop;
end select;
end loop;
end Consumer;
------------
-- Buffer --
------------
protected body Buffer is
-----------------
-- Set_Message --
-----------------
entry Set_Message (Item : in Producer_Message) when not Is_New is
begin
Msg := Item;
Is_New := True;
end Set_Message;
-----------------
-- Get_Message --
-----------------
entry Get_Message (Item : out Producer_Message) when Is_New is
begin
Item := Msg;
Is_New := False;
end Get_Message;
end Buffer;
end Multiple_Producer;
with Multiple_Producer; use Multiple_Producer;
procedure Main is
subtype Producer_Range is Positive range 1..5;
The_Producers : array(Producer_Range) of Producer;
The_Buffers : Buf_Access := new Buf_Array(Producer_Range);
begin
for I in Producer_Range loop
The_Producers(I).Set_Buffer(The_Buffers(I)'Access);
end loop;
Consumer.Set_Buffers(The_Buffers);
delay 4.0;
for P of The_Producers loop
P.Stop;
end loop;
Consumer.Stop;
end Main;
言語 - Java – user3805189
それぞれのプロデューサのキューを公開し、それぞれのプロデューサにすべてのプロデューサを最も重大なシナリオでポーリングする必要があります。私はむしろアーキテクチャがよりエレガントで(使い易い)、ボトルネックがあれば、ほとんどのブローカー(rabbitmq、activemq ...など)が分散アーキテクチャをサポートするため、ブローカーを使用したいと思います。それがなければ、あなたはそれを自分でやる必要があります。 – Adonis