send
またはrecv
のflags
パラメータにZMQ_DONTWAIT
フラグを指定すると、機能がスレッドをブロックしないことがマニュアルに記載されています。しかし、それは何とか私の場合には動作しません:ZMQ_DONTWAITフラグが機能しません。
std::cout << "a";
if(ToSend.try_pop(send))
{
std::cout << "b";
local.send(send.data(),send.size(),ZMQ_DONTWAIT);
}
std::cout << "c";
if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT))
std::cout << "Received: " << (char*)recv.data() << std::endl;
std::cout << "d" << std::endl;
これは印刷されます。
abcdab
私は物事を簡単にするために少しのクラスを作っ:すべてからダウンストライピング
Clientクラス( "簡略化のために使用されていない」もの、)
class client
{
public:
client()
{
}
inline bool init(unsigned short threads = 1)
{
Running = true;
context = zmq_init (threads);
if(context == NULL)
return false;
socket = zmq_socket (context, ZMQ_REQ);
if(socket == NULL)
return false;
return true;
}
inline int connect(const char * address, unsigned short port)
{
return zmq_connect(socket,string_format("tcp://%s:%d",address,port).c_str());
}
inline bool send (void *data, size_t len_, int flags_ = 0)
{
message_t request (len_);
memcpy ((void *) request.data(), data, len_);
int rc = zmq_send (socket, request.data(), request.size(), flags_);
if (rc >= 0)
return true;
if (rc == -1 && zmq_errno() == EAGAIN)
return false;
throw error_t();
}
inline bool recv (void * data, size_t len_, int flags_)
{
message_t reply(len_);
int rc = zmq_recv (socket, reply.data(), len_, flags_);
if (rc >= 0)
{
memcpy (data,(void *)reply.data(), reply.size());
return true;
}
if (rc == -1 && zmq_errno() == EAGAIN)return false;
throw error_t();
}
inline bool IsRunning()
{
return Running;
}
private:
void * context;
void * socket;
std::atomic<bool> Running;
};
、ここでワーカースレッドです:
namespace Data
{
Concurrency::concurrent_queue <message_t> ToSend;
void Processor(char * address, unsigned short port, unsigned short threads)
{
client local;
if(!local.init(threads))return;
if(local.connect(address,port) != 0)return;
message_t recv(Networking::max_packet);
message_t send(Networking::max_packet);
while(local.IsRunning())
{
std::cout << "a";
if(ToSend.try_pop(send))
{
std::cout << "b";
local.send(send.data(),send.size(),ZMQ_DONTWAIT);
}
std::cout << "c";
if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT))
std::cout << "Received: " << (char*)recv.data() << std::endl;
std::cout << "d" << std::endl;
}
}
};
ここに何とか問題があります。なぜ私はそれが機能していないのか分かりません。
これは私がワーカースレッドを起動する方法です:
int Thread(char * address, unsigned short port, unsigned short threads)
{
std::thread data(Data::Processor,address,port,threads);
data.detach();
while(!Data::status){}
return Data::status;
}
int main(int argc, char* argv[])
{
std::thread s(Server::RUN);
Client::message_t tosend(14);
memcpy((void*)tosend.data(),"Hello World !\0",14);
Client::Data::ToSend.push(tosend);
std::cout << Client::Thread("127.0.0.1",5555,1) << std::endl;
s.join();
return 0;
}
このすべてが正しいように思えるので、なぜRECV /私のスレッドをブロック送っていますか?なぜフラグは機能しないのですか?
送信機能のエラーを確認してください。 "zmq_send()関数はerrnoをEAGAINに設定して失敗します。" –
EAGAINエラーの場合は、何もブロックしないでください。実際に何かを送るまでは何の帰りもありません。そしてそれがうまくいっていると、それはうまくいっています。 –