2016-06-28 14 views
0

私はzeromq-4.1.4ライブラリーとcppzmqをリアルタイム高速サーバーと低速クライアントにインストールしています。ZMQ_SUB(フィルターなし)でZMQ_CONFLATEが機能しない

クライアントとサーバーの両方に、パブリッシュとサブスクライブ用の2つのポートがあり、TCP-IP経由で通信します。

サーバーは、それ自身の速い速度でメッセージを送信します。クライアントは最新のメッセージを受信し、計算が遅くなり、メッセージをサーバーに戻します。受信したメッセージがあれば、サーバーはそのメッセージを読み取り、処理します。

古いメッセージは新しいもので上書きされないという問題があります。クライアントは常に古いメッセージを出力し、サーバーの電源を切っても、クライアントの受信バッファーからメッセージが引き続きキューに入れられます。

どうしてですか? ZMQ_CONFLATEが設定されています。それはちょうど働くべきではないか?

回避策として、クライアントをワーカースレッドに入れて最大速度で作業し、最後のメッセージを手動で保存します。しかし、これはオーバーヘッドです。これはまさにzeromqがメッセージを送受信するときに私が理解する限りです。

クライアント/サーバー・コードが同じである

void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags) 
{ 
    flags_ = flags; 
    int confl = 1; 

    // Prepare our context 
    context_ = new zmq::context_t(1); 

    // Prepare ZMQ publisher 
    publisher_ = new zmq::socket_t(*context_, ZMQ_PUB); 
    publisher_->bind(pubAddress); 
    publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message 

    // Prepare ZMQ subscriber 
    subscriber_ = new zmq::socket_t(*this->context_, ZMQ_SUB); 
    subscriber_->connect(subAddress); 
    subscriber_->setsockopt(ZMQ_SUBSCRIBE, "", 0); 
    subscriber_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message 

    if (flags_ & ZMQ_SYNC_PUB) 
    { 
    syncService_ = new zmq::socket_t(*context_, ZMQ_REP); 
    syncService_->bind(syncAddress); 
    } 

    if (flags_ & ZMQ_SYNC_SUB) 
    { 
    // synchronize with publisher 
    syncService_ = new zmq::socket_t(*context_, ZMQ_REQ); 
    syncService_->connect(syncAddress); 

    // - send a synchronization request 
    zmq::message_t message(0); 
    syncService_->send(message); 

    // - wait for synchronization reply 
    zmq::message_t update; 
    syncService_->recv(&update); 
    } 
} 

void ZeromqMessenger::sync() 
{ 
    if (connected_) 
    return; 

    if (flags_ & ZMQ_SYNC_PUB) 
    { 
    //std::cout << "Waiting for subscribers" << std::endl; 
    if (subscribers_ < subscribers_expected_) 
    { 
     // - wait for synchronization request 
     zmq::message_t update; 
     if (syncService_->recv(&update, ZMQ_DONTWAIT)) 
     { 
     // - send synchronization reply 
     zmq::message_t message(0); 
     syncService_->send(message); 

     subscribers_++; 
     } 
    } 

    if (subscribers_ == subscribers_expected_) 
     connected_ = true; 
    } 
} 

void ZeromqMessenger::send(const void* data, int size) const 
{ 
    zmq::message_t message(size); 
    memcpy(message.data(), data, size); 
    publisher_->send(message); 
} 

bool ZeromqMessenger::recv(void *data, int size, int flags) const 
{ 
    zmq::message_t update; 
    bool received = subscriber_->recv(&update, flags); 
    if(received) 
    memcpy(data, update.data(), size); 
    return received; 
} 

答えて

0

私はネジ付きバージョンを実装し、それだけで正常に動作します。これはグローバル変数を持つ非常に粗末な実装であり、洗練されていますが、少なくとも動作します。

#include <zmq_messenger.h> 
#include <iostream> 
#include <thread> 
#include <mutex> 

std::string gSubAddress; 
std::mutex gMtx; 
const int gSize = 20*sizeof(double); 
char gData[gSize]; 

void *worker_routine (void *context) 
{ 
    // Prepare ZMQ subscriber 
    int confl = 1; 
    zmq::socket_t* subscriber = new zmq::socket_t(*(zmq::context_t*)context, ZMQ_SUB); 
    subscriber->connect(gSubAddress.c_str()); 
    subscriber->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message 
    subscriber->setsockopt(ZMQ_SUBSCRIBE, "", 0); 

    while (1) 
    { 
    zmq::message_t update; 
    bool received = subscriber->recv(&update, ZMQ_DONTWAIT); 
    if(received) 
    { 
     gMtx.lock(); 
     memcpy(gData, update.data(), gSize); 
     gMtx.unlock(); 
    } 
    } 
    zmq_close(subscriber); 
    return NULL; 
} 

void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags) 
{ 
    flags_ = flags; 
    int confl = 1; 

    // Prepare our context 
    context_ = new zmq::context_t(1); 

    // Prepare ZMQ publisher 
    publisher_ = new zmq::socket_t(*context_, ZMQ_PUB); 
    publisher_->bind(pubAddress); 
    publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message 

    gSubAddress = std::string(subAddress); 
    pthread_create (&subscriber_worker_, NULL, worker_routine, context_); 

    if (flags_ & ZMQ_SYNC_PUB) 
    { 
    syncService_ = new zmq::socket_t(*context_, ZMQ_REP); 
    syncService_->bind(syncAddress); 
    } 

    if (flags_ & ZMQ_SYNC_SUB) 
    { 
    //std::cout << "Trying to connect" << std::endl; 

    // synchronize with publisher 
    syncService_ = new zmq::socket_t(*context_, ZMQ_REQ); 
    syncService_->connect(syncAddress); 

    // - send a synchronization request 
    zmq::message_t message(0); 
    syncService_->send(message); 

    // - wait for synchronization reply 
    zmq::message_t update; 
    syncService_->recv(&update); 

    // Third, get our updates and report how many we got 
    //std::cout << "Ready to receive" << std::endl; 
    } 
} 

void ZeromqMessenger::sync() 
{ 
    //std::cout << "sync" << std::endl; 
    if (connected_) 
    return; 

    if (flags_ & ZMQ_SYNC_PUB) 
    { 
    //std::cout << "Waiting for subscribers" << std::endl; 
    if (subscribers_ < subscribers_expected_) 
    { 
     // - wait for synchronization request 
     zmq::message_t update; 
     if (syncService_->recv(&update, ZMQ_DONTWAIT)) 
     { 
     // - send synchronization reply 
     zmq::message_t message(0); 
     syncService_->send(message); 

     subscribers_++; 
     } 
    } 

    if (subscribers_ == subscribers_expected_) 
     connected_ = true; 

    //std::cout << subscribers_ << " subscriber(s) connected" << std::endl; 
    } 
} 

void ZeromqMessenger::send(const void* data, int size) const 
{ 
    zmq::message_t message(size); 
    memcpy(message.data(), data, size); 
    publisher_->send(message); 
} 

bool ZeromqMessenger::recv(void *data, int size, int flags) const 
{ 
    assert(gSize == size); 
    gMtx.lock(); 
    memcpy(data, gData, size); 
    gMtx.unlock(); 
    return true; 
} 
関連する問題