私はzeromq-4.1.4ライブラリーとcppzmqをリアルタイム高速サーバーと低速クライアントにインストールしています。ZMQ_SUB(フィルターなし)でZMQ_CONFLATEが機能しない
クライアントとサーバーの両方に、パブリッシュとサブスクライブ用の2つのポートがあり、TCP-IP経由で通信します。
サーバーは、それ自身の速い速度でメッセージを送信します。クライアントは最新のメッセージを受信し、計算が遅くなり、メッセージをサーバーに戻します。受信したメッセージがあれば、サーバーはそのメッセージを読み取り、処理します。
古いメッセージは新しいもので上書きされないという問題があります。クライアントは常に古いメッセージを出力し、サーバーの電源を切っても、クライアントの受信バッファーからメッセージが引き続きキューに入れられます。
どうしてですか? ZMQ_CONFLATEが設定されています。それはちょうど働くべきではないか?
回避策として、クライアントをワーカースレッドに入れて最大速度で作業し、最後のメッセージを手動で保存します。しかし、これはオーバーヘッドです。これはまさにzeromqがメッセージを送受信するときに私が理解する限りです。
クライアント/サーバー・コードが同じである
:void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags)
{
flags_ = flags;
int confl = 1;
// Prepare our context
context_ = new zmq::context_t(1);
// Prepare ZMQ publisher
publisher_ = new zmq::socket_t(*context_, ZMQ_PUB);
publisher_->bind(pubAddress);
publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
// Prepare ZMQ subscriber
subscriber_ = new zmq::socket_t(*this->context_, ZMQ_SUB);
subscriber_->connect(subAddress);
subscriber_->setsockopt(ZMQ_SUBSCRIBE, "", 0);
subscriber_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
if (flags_ & ZMQ_SYNC_PUB)
{
syncService_ = new zmq::socket_t(*context_, ZMQ_REP);
syncService_->bind(syncAddress);
}
if (flags_ & ZMQ_SYNC_SUB)
{
// synchronize with publisher
syncService_ = new zmq::socket_t(*context_, ZMQ_REQ);
syncService_->connect(syncAddress);
// - send a synchronization request
zmq::message_t message(0);
syncService_->send(message);
// - wait for synchronization reply
zmq::message_t update;
syncService_->recv(&update);
}
}
void ZeromqMessenger::sync()
{
if (connected_)
return;
if (flags_ & ZMQ_SYNC_PUB)
{
//std::cout << "Waiting for subscribers" << std::endl;
if (subscribers_ < subscribers_expected_)
{
// - wait for synchronization request
zmq::message_t update;
if (syncService_->recv(&update, ZMQ_DONTWAIT))
{
// - send synchronization reply
zmq::message_t message(0);
syncService_->send(message);
subscribers_++;
}
}
if (subscribers_ == subscribers_expected_)
connected_ = true;
}
}
void ZeromqMessenger::send(const void* data, int size) const
{
zmq::message_t message(size);
memcpy(message.data(), data, size);
publisher_->send(message);
}
bool ZeromqMessenger::recv(void *data, int size, int flags) const
{
zmq::message_t update;
bool received = subscriber_->recv(&update, flags);
if(received)
memcpy(data, update.data(), size);
return received;
}