2016-10-10 26 views
0

私はディーラー< - >ルーターの設定でNetMQ v4を使用しています。問題のない方向にメッセージを非同期で送受信できます。ZeroMQまたはNetMQのルータソケットから送受信するにはどうしたらいいですか?

私は今、サーバ(ルータ)はすべての着信メッセージをリッスンするが、それはまた、接続されたクライアント(ディーラー)のいずれかに需要ブロードキャストメッセージにする必要があります。抽象化の中にこれを正式なものにしたいです

私はサブスクライバがサーバーにもメッセージを送信する必要があるので、Pub < - >サブソケットを使用しないようにしています。私が達成しようとしているものに最も近いパターンは、WebSocketのクライアントとサーバー間の通信です。

クライアントメッセージを聞いての最初の部分が何かに行われています。

using (var server = new RouterSocket("@tcp://*:80")) 
{ 
    var addresses = new HashSet<string>(); 
    while (true) 
    { 
     var msg = server.ReceiveMultipartMessage(); 

     var address = Encoding.UTF8.GetString(msg[0].Buffer); 
     var payload = Encoding.UTF8.GetString(msg[2].Buffer); 
     Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload); 

     var contains = addresses.Contains(address); 
     if (!contains) { addresses.Add(address); }    

     msg.Clear(); 
     msg.Append(address); 
     msg.AppendEmptyFrame(); 
     msg.Append("Reply for: " + address); 
     server.SendMultipartMessage(msg); 
    } 
} 

今、ソケットはスレッドセーフされていないことを考えると、私はから来る(メッセージをブロードキャストする方法を見つけることにこだわっていますオンデマンドで異なるスレッド)をすべてのクライアントに提供します。

ループでは、おそらくループの中で設定されたタイムアウトを使用して、ブロードキャストメッセージのキューをチェックしてから、そのようなメッセージを送信する各クライアントをループすることができます。ような何か:何とか

using (var server = new RouterSocket("@tcp://*:80")) 
{ 
    var addresses = new HashSet<string>(); 

    var msg = new NetMQMessage(); 
    while (true) 
    { 
     var clientHasMsg = server.TryReceiveMultipartMessage(TimeSpan.FromSeconds(1), ref msg); 
     if (!clientHasMsg) 
     { 
      // Check any incoming broacast then loop through all the clients 
      // sending each the brodcast msg 
      var broadMsg = new NetMQMessage(); 
      foreach (var item in addresses) 
      { 
       broadMsg.Append(item); 
       broadMsg.AppendEmptyFrame(); 
       broadMsg.Append("This is a broadcast"); 
       server.SendMultipartMessage(broadMsg); 
       broadMsg.Clear(); 
      } 

      // Go back into the loop waiting for client messages 
      continue; 
     } 

     var address = Encoding.UTF8.GetString(msg[0].Buffer); 
     var payload = Encoding.UTF8.GetString(msg[2].Buffer); 
     Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload); 

     var contains = addresses.Contains(address); 
     if (!contains) { addresses.Add(address); } 

     msg.Clear(); 
     msg.Append(address); 
     msg.AppendEmptyFrame(); 
     msg.Append("Reply for: " + address); 
     server.SendMultipartMessage(msg); 
    } 
} 

これは、右の主な要因に感じることはありません。

  • 良い値がタイムアウトのためにどのような値はありますか? 1秒、100ms等;
  • このプログラムは、1秒間に何千ものメッセージを送信するたびに100k +クライアントを接続するために使用されるため、最も効率的な/実行中のソリューションですか。

これに対する最善のアプローチは、非常に高く評価されています。

答えて

1

マルチプロデューサの単一コンシューマキューであるnetmqqueueを使用できます。 NetMQPollerに追加し、ロックなしで複数のスレッドからエンキューできます。

+0

'Device'であなたのブログを読んでいて、' Queue'を言及するまでは良い選択だと思っていました:-) http://netmq.readthedocsのいくつかの行以外の例がありますか? io)? – MaYaN

+0

さて、もう1つの例を追加しなくても、NetMQQueue とNetMQSschedulerの違いは何ですか? v4ではスケジューラが廃止されましたか? – MaYaN

+0

NetMQSchedulerは廃止されました(NetMQPollerの一部になりました)とにかく、NetMQSchedulerはタスクのキューです。NetMQQueueはどのタイプのキューでもあります。 – somdoron

0

私はPUB/SUBが100k +クライアントの要件に適切なアプローチだと思います。それにもかかわらず、それはあなたがサーバーに戻って通信することができないという意味ではありません:DEALER/ROUTERを使用してください。なぜこの解決策は受け入れられないと思いますか?

+0

私は正しく理解しているかどうかわかりません。上記のソリューションに基づいてDealer/Routerを使用してPub/Subを行うのはあなたにとって合理的だと言っていますか? – MaYaN

関連する問題