2017-08-23 10 views
3

ZeroMQにPUB/SUBという問題があります。ZeroMQ SUBはメッセージを受信しません

すべてを接続した後、発行者は、すべてのメッセージを公開する(ソケットの送信メッセージがtrueを返す)が、SUB.recv()機能に永遠に彼らとブロックを受け取ることはありません。ここで

は、私が使用していますコードです:

void startPublisher() 
{ 
    zmq::context_t zmq_context(1); 
    zmq::socket_t zmq_socket(zmq_context, ZMQ_PUB); 
    zmq_socket.bind("tcp://127.0.0.1:58951"); 

    zmq::message_t msg(3); 
    memcpy(msg.data(), "abc", 3); 

    for(int i = 0; i < 10; i++) 
     zmq_socket.send(msg); // <-- always true 
} 

void startSubscriber() 
{ 
    zmq::context_t zmq_context(1); 
    zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB); 

    zmq_socket.connect("tcp://127.0.0.1:58951"); 
    zmq_socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); // allow all messages 

    zmq::message_t msg(3); 
    zmq_socket.recv(&msg); // <-- blocks forever (message never received?) 
} 

も試してみました(私は、2つの異なるスレッドでこれらの2つの機能をruningて最初SUBスレッドを開始、いくつかの時間を待った後、出版社のスレッドを開始していますのでご注意くださいそれ以外の方法では、エンドレスループでメッセージを送信するが、動作しなかった)。

私はここで間違っていますか?

+2

サブスクライバは、ドキュメンテーション/例の一部として、そしてパブリッシャの最初に実行する必要があります。サブスクライバスレッドの開始とパブリッシャスレッドの開始の間に遅延を追加して、それが違いを生むかどうかを確認します。後に適切な同期を追加することができます。 "tcp:// *:58951" –

+0

こんにちは、私はちょうどこの問題を解決することができました...これは "遅い結合者"と呼ばれ、[ここ](http:// zguide .zeromq.org/page:all)を詳しく説明します。つまり、TCPハンドシェイクが完了するまでに時間がかかるため、接続が確立されます。しかし、あなたは絶対に正しかったです。 – carobnodrvo

+1

これは、サブとパブの開始点の間に適切なシンクメカニズムを提供することで解決できます。 –

答えて

3

あなたの例に基づいて、次のコードが私に役立ちます。 問題は、PUB/SUBパターンが遅い結合子であることです。つまり、PUBソケットをバインドしてメッセージを送信する前にしばらく待つ必要があります。

#include <thread> 
#include <zmq.hpp> 
#include <iostream> 
#include <unistd.h> 
void startPublisher() 
{ 
    zmq::context_t zmq_context(1); 
    zmq::socket_t zmq_socket(zmq_context, ZMQ_PUB); 
    zmq_socket.bind("tcp://127.0.0.1:58951"); 
    usleep(100000); // Sending message too fast after connexion will result in dropped message 
    zmq::message_t msg(3); 
    for(int i = 0; i < 10; i++) { 
     memcpy(msg.data(), "abc", 3); 
     zmq_socket.send(msg); // <-- always true 
     msg.rebuild(3); 
     usleep(1); // Temporisation between message; not necessary 
    } 
} 
volatile bool run = false; 
void startSubscriber() 
{ 
    zmq::context_t zmq_context(1); 
    zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB); 
    zmq_socket.connect("tcp://127.0.0.1:58951"); 
    std::string TOPIC = ""; 
    zmq_socket.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length()); // allow all messages 
    zmq_socket.setsockopt(ZMQ_RCVTIMEO, 1000); // Timeout to get out of the while loop 
    while(run) { 
     zmq::message_t msg; 
     int rc = zmq_socket.recv(&msg); // Works fine 
     if(rc) // Do no print trace when recv return from timeout 
      std::cout << std::string(static_cast<char*>(msg.data()), msg.size()) << std::endl; 
    } 
} 
int main() { 
    run = true; 
    std::thread t_sub(startSubscriber); 
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern 
    std::thread t_pub(startPublisher); 
    t_pub.join(); 
    sleep(1); 
    run = false; 
    t_sub.join(); 
} 
+3

'ZMQ_RCVTIMEO'を調整することから始めることは、適切なデザイナーの仕事からの脱出です。ブロッキングの練習(主に悪い)を使う必要がないため、明示的に非ブロッキングの '.recv()' -call構文と '.poll()' - インストルメンテーションがあります。プロフェッショナルレベルの分散コンピューティングシステムは決してブロックされるべきではありません(多くの明白な理由のために、コードはブロッキングステートの全期間、無限になる可能性があります)ので、コードデザイナーは簡単で、いくつかのSLOCのスペース制限がありました。 – user3666197

+0

RTOSのエキスパートだったので、これははっきりとした音ではありません。o) – user3666197

+0

はい、分散型PUB/SUBシステムの方が良いパターンです。私はちょうどOPの例のように近くにとどまっている間にゆっくりとしたジョイナーの特性を強調しようとしました。あなたは絶対に正しいです:)。しかし、スレッドPUB/SUBの簡単な例では、十分に良いようです。 – Clonk

関連する問題