2012-04-20 4 views
0

私はZeroMQでしばらくプレイして、私が思いついた質問/問題をいくつか持っています。 ZeroMQの貢献者が誰でもチャイムすることができれば、またはライブラリを使用しているか現在使用している人なら誰でも感謝します。ZeroMQ/0MQプッシュ/プルメモリとルーティングの問題

*私は1台のルータ/フォワーダと2種類のクライアント(c1、c2)を持っているとします。私はルーティングデバイスを介してクライアント1からクライアント2にメッセージをプッシュしたい。ルータは、クライアント(ここではclient1)からメッセージを引き出し、それらを任意のサブスクライブされたクライアント(ここではclient2)にパブリッシュします。しかし、私は現在、そのようなメッセージを適切なクライアントにルーティングする唯一の方法は、pub/subを使用することです。a)メッセージボディとともにroutingToタグを送信することによって実行時にルーティングする方法を決定したい、b)push /私はc1とc2を押して正確に1つのポートに接続し、サブスクライブするために1つのポートに接続したいと思っています。私は何とかルータ側でpub/subを使用する必要がないために変更を加えることができますか、pub/subはメッセージが転送されるはずのルーティング側で知っていてもクライアントにルーティングする唯一の方法ですか?キューサイズが私が望んでいないhwmを超えると、そのpub/sub dropsメッセージを読んでいます。また、私は返答が必要ないので、不必要なオーバーヘッドを追加するので、リクエスト/返信のパターンを実装したくありません。コードの下に実行した後

* (プッシュ/プル - >パブ/サブ)は、すべてのメッセージを送信し、すべてのメッセージはまだメッセージを押し出す、明らかに巨大なメモリ・フットプリントがあります表示するクライアントを受け取った確認を受けているとプッシュソケットのキューにまだ膨大な量のメッセージが残っています。それはなぜですか、それを修正するために私は何ができますか?ここで

は私のコードです:

ROUTER:

class Program 
{ 
    static void Main(string[] args) 
    { 
     using (var context = new Context(1)) 
     { 
      using (Socket socketIn = context.Socket(SocketType.PULL), socketOut = context.Socket(SocketType.XPUB)) 
      { 
       socketIn.HWM = 10000; 
       socketOut.Bind("tcp://*:5560"); //forwards on this port 
       socketIn.Bind("tcp://*:5559"); //listens on this port 

       Console.WriteLine("Router started and running..."); 

       while (true) 
       { 
        //Receive Message 
        byte[] address = socketIn.Recv(); 
        byte[] body = socketIn.Recv(); 

        //Forward Message 
        socketOut.SendMore(address); 
        socketOut.Send(body); 
       } 
      } 
     } 
    } 
} 

CLIENT1:

class Program 
{ 
    static void Main(string[] args) 
    { 
     using (var context = new Context(1)) 
     { 
      using (Socket socketIn = context.Socket(SocketType.SUB), socketOut= context.Socket(SocketType.PUSH)) 
      { 
       byte[] iAM = Encoding.Unicode.GetBytes("Client1"); 
       byte[] youAre = Encoding.Unicode.GetBytes("Client2"); 
       byte[] msgBody = new byte[16]; 

       socketOut.HWM = 10000; 
       socketOut.Connect("tcp://localhost:5559"); 
       socketIn.Connect("tcp://localhost:5560"); 
       socketIn.Subscribe(iAM); 

       Console.WriteLine("Press key to kick off Test Client1 Sending Routine"); 
       Console.ReadLine(); 

       for (int counter = 1; counter <= 10000000; counter++) 
       { 
        //Send Message 
        socketOut.SendMore(youAre); 
        socketOut.Send(msgBody); 
       } 

       Console.WriteLine("Client1: Finished Sending"); 
       Console.ReadLine(); 
      } 
     } 
    } 
} 

がCLIENT2:

class Program 
{ 
    public static int msgCounter; 

    static void Main(string[] args) 
    { 
     msgCounter = 0; 

     using (var context = new Context(1)) 
     { 
      using (Socket socketIn = context.Socket(SocketType.SUB), socketOut = context.Socket(SocketType.PUSH)) 
      { 
       byte[] iAM = Encoding.Unicode.GetBytes("Client2"); 

       socketOut.Connect("tcp://localhost:5559"); 
       socketIn.Connect("tcp://localhost:5560"); 
       socketIn.Subscribe(iAM); 

       Console.WriteLine("Client2: Started Listening"); 

       //Receive First Message 
       byte[] address = socketIn.Recv(); 
       byte[] body = socketIn.Recv(); 
       msgCounter += 1; 

       Console.WriteLine("Received first message"); 

       Stopwatch watch = new Stopwatch(); 
       watch.Start(); 

       while (msgCounter < 10000000) 
       { 
        //Receive Message 
        address = socketIn.Recv(); 
        body = socketIn.Recv(); 
        msgCounter += 1; 
       } 

       watch.Stop(); 
       Console.WriteLine("Elapsed Time: " + watch.ElapsedMilliseconds + "ms"); 
       Console.ReadLine(); 
      } 
     } 
    } 
} 

答えて

2

私はあなたのアーキテクチャがかもしれないことを示唆するつもりですここから少し離れていてください。

1)正確に1つのPUSHと1つのPULLが必要な場合は、デバイスを真ん中から取り外します。ノードを追加するたびにプロデューサを更新する必要がないように、複数のコンシューマを管理するために、デバイスがアーキテクチャに明示的に追加されます。いつ/複数のコンシューマーやプロデューサーが必要なのかを知るには、デバイス上の各ノードに接続する必要があります。この場合、デバイスがあなたのソリューションを過度に複雑にしているように見えます。

2)「ルート」タグを持つというアイデアは、本当に私の心を揺さぶる。おそらく、他の統合オプションよりもメッセージングを選択する最大の理由は、どちらの側も(ブローカレス設計の場合にメッセージを送信する場所以外の)他の側について何も知る必要がないように、プロデューサとコンシューマを切り離すことです。ロジックに直接ルーティング情報を追加すると、これが破られます。

オーバーヘッドに関しては、私はこれを経験したことがありません。しかし、私はZeroMQのために.Netドライバを使ったことは一度もありませんでした。無神経な推測は.Netドライバ自体を見ることです。

+0

申し訳ありません申し訳ありませんが、アーキテクチャでは、同時にフレームワーク内の他のエンティティと通信するプロデューサ/コンシューマである多くのエンティティを保持します。したがって、ポイントツーポイントチャンネルは、あまりにも多くのチャンネルを導入するので、意味がない。2)ルーティング知識が望ましくないというあなたの考えに強く反する。私は、メッセージ本文をルーティングロジックから分離する必要があることに同意しますが、私が明らかにしたように、最終的にはフレームワーク全体にn個のエンティティがあり、各エンティティは特定のメッセージを送信する場所を知っています。 –

+0

大きな画像は私が異なるプロセス間のプロセス間通信を望んでおり、各プロセスがどの他のプロセスに対処しているかを知っているので、ルーティングロジックは多くの意味があります。盲目的にメッセージを送信し、接続されたすべてのエンティティがそれらを受け取って受信側で受信したメッセージが、実際にそのようなエンティティによって処理されるはずのメッセージかどうかを決定することは、帯域幅および計算能力の巨大な浪費である。 –

+0

Ah。それから謝罪する。元の記事では、実際にはルータの中央に統合しようとしていたクライアントノードが2つしかないと思っていました。おそらく、 "routeTo"は単なる意味論であり、私がぶら下げたものです。ルーターがルーティングの決定を下すのに使用している "トピック"として "routeTo"を記述することは正確でしょうか?もしそうなら、上記の答えを編集して、新しい理解をもって対処することができます。私は元の返答も残しておきます。そうすれば、このスレッドに来る人々は、これを読んだときに文脈全体を持つことができます。 – Shaun