2013-03-23 7 views
7

sendまたはrecvflagsパラメータにZMQ_DONTWAITフラグを指定すると、機能がスレッドをブロックしないことがマニュアルに記載されています。しかし、それは何とか私の場合には動作しません:ZMQ_DONTWAITフラグが機能しません。

  std::cout << "a"; 
      if(ToSend.try_pop(send)) 
      { 
       std::cout << "b"; 
       local.send(send.data(),send.size(),ZMQ_DONTWAIT); 
      } 
      std::cout << "c"; 
      if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT)) 
       std::cout << "Received: " << (char*)recv.data() << std::endl; 
      std::cout << "d" << std::endl; 

これは印刷されます。

abcdab 

私は物事を簡単にするために少しのクラスを作っ:すべてからダウンストライピング

Clientクラス( "簡略化のために使用されていない」もの、)

class client 
{ 
public: 
    client() 
    { 

    } 
    inline bool init(unsigned short threads = 1) 
    { 
     Running = true; 
     context = zmq_init (threads); 
     if(context == NULL) 
      return false; 
     socket = zmq_socket (context, ZMQ_REQ); 
     if(socket == NULL) 
      return false; 
     return true; 
    } 
    inline int connect(const char * address, unsigned short port) 
    { 
     return zmq_connect(socket,string_format("tcp://%s:%d",address,port).c_str()); 
    } 
    inline bool send (void *data, size_t len_, int flags_ = 0) 
    { 
     message_t request (len_); 
     memcpy ((void *) request.data(), data, len_); 
     int rc = zmq_send (socket, request.data(), request.size(), flags_); 
     if (rc >= 0) 
      return true; 
     if (rc == -1 && zmq_errno() == EAGAIN) 
      return false; 
     throw error_t(); 
    } 
    inline bool recv (void * data, size_t len_, int flags_) 
    { 
     message_t reply(len_); 
     int rc = zmq_recv (socket, reply.data(), len_, flags_); 
     if (rc >= 0) 
     { 
      memcpy (data,(void *)reply.data(), reply.size()); 
      return true; 
     } 
     if (rc == -1 && zmq_errno() == EAGAIN)return false; 
     throw error_t(); 
    } 
    inline bool IsRunning() 
    { 
     return Running; 
    } 
private: 
    void * context; 
    void * socket; 
    std::atomic<bool> Running; 
}; 

、ここでワーカースレッドです:

namespace Data 
{ 
    Concurrency::concurrent_queue <message_t> ToSend; 
    void Processor(char * address, unsigned short port, unsigned short threads) 
    { 
     client local; 
     if(!local.init(threads))return; 
     if(local.connect(address,port) != 0)return; 
     message_t recv(Networking::max_packet); 
     message_t send(Networking::max_packet); 
     while(local.IsRunning()) 
     { 
      std::cout << "a"; 
      if(ToSend.try_pop(send)) 
      { 
       std::cout << "b"; 
       local.send(send.data(),send.size(),ZMQ_DONTWAIT); 
      } 
      std::cout << "c"; 
      if(local.recv(recv.data(),Networking::max_packet,ZMQ_DONTWAIT)) 
       std::cout << "Received: " << (char*)recv.data() << std::endl; 
      std::cout << "d" << std::endl; 
     } 
    } 
}; 

ここに何とか問題があります。なぜ私はそれが機能していないのか分かりません。

これは私がワーカースレッドを起動する方法です:

int Thread(char * address, unsigned short port, unsigned short threads) 
{ 
    std::thread data(Data::Processor,address,port,threads); 
    data.detach(); 
    while(!Data::status){} 
    return Data::status; 
} 
int main(int argc, char* argv[]) 
{ 

    std::thread s(Server::RUN); 

    Client::message_t tosend(14); 
    memcpy((void*)tosend.data(),"Hello World !\0",14); 

    Client::Data::ToSend.push(tosend); 

    std::cout << Client::Thread("127.0.0.1",5555,1) << std::endl; 

    s.join(); 
    return 0; 
} 

このすべてが正しいように思えるので、なぜRECV /私のスレッドをブロック送っていますか?なぜフラグは機能しないのですか?

+0

送信機能のエラーを確認してください。 "zmq_send()関数はerrnoをEAGAINに設定して失敗します。" –

+0

EAGAINエラーの場合は、何もブロックしないでください。実際に何かを送るまでは何の帰りもありません。そしてそれがうまくいっていると、それはうまくいっています。 –

答えて

1

"abcd"と "ab"が表示されます。これは、あなたが質問の最後に示したコードは、あなただけの()を1回ToSend.pushをやった意味がコード

std::cout << "a"; 
if(ToSend.try_pop(send)) 
{ 
    std::cout << "b"; 

が2回実行されたことを意味します。

デバッガを接続して、どのスレッドがハングアップしていたのか、そのコールスタックが何であるのか確認しましたか?

  • mutex/critsectの後ろにstd :: coutを置くことで、一度に1つのスレッドだけが書き込みを行い、あなたのスレッドを報告します。
  • バッファリングのために手紙を逃さないように、それぞれの出力を独自の行に置きます。
  • データをブロックしていない場合、どのように受信する予定ですか?あなたはどこか選択/投票/ WaitForSingleObject呼び出しを持っていますか?
+0

hm権利:o私の頭の上に常に見える小さな問題:$ –

関連する問題