2012-02-15 19 views
2

p2pアプリケーションの非同期ネットワークプログラミングで問題が発生しました。 私のアプリケーションは、サーバーとクライアントの両方でなければなりません。サーバーが リクエストを受信すると、それを他のサーバーkにブロードキャストする必要があります。私は、boost :: asioの例のHTTP Server 3 Exampleは、(クラスとして)非同期クライアントの実装でうまくいくと思っています。 (ブーストから:: ASIOクライアントの例)上記 クライアント・クラスは、以下である:クライアントがあるC++のboost/asioサーバー

ClientIO::ClientIO(boost::asio::io_service& io_service, tcp::resolver::iterator endpoint_iterator) 
: _io_service(io_service), 
     strand_(io_service), 
     resolver_(io_service), 
    socket_(io_service) 
    { 
    tcp::endpoint endpoint = *endpoint_iterator; 
     socket_.async_connect(endpoint, 
     boost::bind(&ClientIO::handle_after_connect, this, 
     boost::asio::placeholders::error, ++endpoint_iterator)); 
    } 

    void ClientIO::write(G3P mex) 
    { 
    _io_service.post(boost::bind(&ClientIO::writeMessage, this, mex)); 
    } 

    void ClientIO::writeMessage(G3P mex) 
    { 
    bool write_in_progress = !messages_queue_.empty(); 
    messages_queue_.push_back(mex); 
    if (!write_in_progress) 
    { 
    char* message=NULL; 
    boost::system::error_code ec; 
    if (messages_queue_.front().opcode == DATA) 
    { 
     message=(char*)malloc((10800)*sizeof(char)); 
    } 
    else 
     message=(char*)malloc(1024*sizeof(char)); 

    boost::asio::streambuf request; 
    std::ostream request_stream(&request); 
    serializeMessage(message, messages_queue_.front()); 
    request_stream << message; 
    boost::asio::async_write(socket_, boost::asio::buffer(message, strlen(message)), 
    strand_.wrap(
    boost::bind(&ClientIO::handle_after_write, this, 
    boost::asio::placeholders::error))); 
     free(message); 
    } 
    } 

    void ClientIO::readMessage() 
    { 
boost::asio::async_read(socket_, data_, 
    boost::bind(&ClientIO::handle_after_read, this, 
boost::asio::placeholders::error, 
boost::asio::placeholders::bytes_transferred 
    )); 
    } 

    void ClientIO::stop() 
    { 
    socket_.shutdown(tcp::socket::shutdown_both); 
    socket_.close(); 
    } 

    void ClientIO::handle_after_connect(const boost::system::error_code& error, 
    tcp::resolver::iterator endpoint_iterator) 
    { 
    if (error) 
    { 
    if (endpoint_iterator != tcp::resolver::iterator()) 
    { 
     socket_.close(); 
     tcp::endpoint endpoint = *endpoint_iterator; 
     socket_.async_connect(endpoint, 
     boost::bind(&ClientIO::handle_after_connect,this, 
     boost::asio::placeholders::error, ++endpoint_iterator)); 
    } 
    } 
    else 
    { 
    } 
    } 

    void ClientIO::handle_after_read(const boost::system::error_code& error, std::size_t bytes_transferred) 
    { 
    if (bytes_transferred > 0) 
    { 
    std::istream response_stream(&data_); 
    std::string mex=""; 
    std::getline(response_stream, mex); 
    deserializeMessage(&reply_,mex); 
    if (reply_.opcode == REPL) 
    { 
     cout << "ack received" << endl; 
    } 
    } 
    if (error) 
    { 
    ERROR_MSG(error.message()); 
    } 
    } 

    void ClientIO::handle_after_write(const boost::system::error_code& error) 
    { 
    if (error) 
    { 
    //   ERROR_MSG("Error in write: " << error.message()); 
    } 
    else 
    { 
    messages_queue_.pop_front(); 
    if (!messages_queue_.empty()) 
    { 
     cout << "[w] handle after write" << endl; 
     char* message; 
     if (messages_queue_.front().opcode == DATA) 
     { 
    message=(char*)malloc((10800)*sizeof(char)); 
     } 
     else 
    message=(char*)malloc(1024*sizeof(char)); 
     boost::asio::streambuf request; 
     std::ostream request_stream(&request); 
     serializeMessage(message, messages_queue_.front()); 
     request_stream << message; 
     boost::asio::async_write(socket_, boost::asio::buffer(message, strlen(message)), 
     strand_.wrap(
     boost::bind(&ClientIO::handle_after_write, this, 
     boost::asio::placeholders::error))); 
    } 

    boost::asio::async_read_until(socket_, data_,"\r\n", 
      strand_.wrap(
      boost::bind(&ClientIO::handle_after_read, this, 
     boost::asio::placeholders::error, 
     boost::asio::placeholders::bytes_transferred))); 
    } 
    } 

    ClientIO::~ClientIO() 
    { 
    cout << "service stopped" << endl; 
    } 

}

新しい要求がサーバによって受信されると、それは新しいDataManagementクラスフォームの接続を開始しますそして、いくつかの計算の後、他のサーバーにキュー を書く(ここではただ1)各書き込みの上とでクラスを使用して私はio_serviceを作成した、ことを達成するために、ACK

client --write-> server ---write->\ 
            |--server1 
       server <--ACK----</ 

に対応していますDataManagementコンストラクタで次でそれをインスタンス化クラス変数としてスタンス(io_service_test):

DataManagement::DataManagement(){ 
    tcp::resolver resolver(io_service_test); 
    tcp::resolver::query query(remotehost, remoteport); 
    tcp::resolver::iterator iterator = resolver.resolve(query); 
    cluster = new cluster_head::ClusterIO(io_service_test,iterator); 
    io_service_test.run_one(); 
} 

そして、算出した後、データを送信する:

void DataManagement::sendTuple(. . .){ 

    . . . 
    io_service_test.reset(); 
    io_service_test.run(); 
    for (size_t i=0; i<ready_queue.size() ;i++) 
    { 
     cluster->write(fragTuple); 
    } 
} 

相手が同じであるHTTP PROXY3同じように変更された例(クライアントクラスなし)問題は時にはすべてがうまくいくことがあります。時には失敗し、スタックトレースを取得します。時には決して停止しない、またはセグメンテーションフォルトが発生することもあります。 私は問題がio_service管理とクラスメソッドの寿命には関係しないと思っていますが、わかりません。

  • この場合に適したいくつかの例がありますか、それを実装するダミーのクラスですか?
+0

バックトレースを提供できますか? – tr9sh

答えて

1

簡単にコードを確認したところ、以下の問題があります。

  1. それ
    • は、任意のデータを送信しないメッセージ
    • 通話boost::asio::async_write、メモリを割り当てますが、内部ASIOの要求のキューに要求を置くためClientIO::writeMessage方法は、受信者に誤った情報を送信つまり、メッセージはいつか送信されます。 boost::asio::bufferメッセージをコピーしません。それはそれへの参照だけを格納します。
    • は、free(messageと呼ばれます)。すなわち、メッセージに割り当てられたメモリは、キューに入れられた書込み要求が実行されるときに上書きされ得る。
  2. ClientIO::handle_after_writeのメモリリーク。 メッセージは割り当てられますが、解放されません。
  3. ClientIO::readMessageboost::asio::async_readメソッドは、strand_.wrapコールでラップされません。

ASIOのバッファの例のshared_const_bufferクラスのようなものを使用するには、#1と#2の問題が必要です。 boost::asio::async_writeコールと同じ方法でstrand_.wrapコールを使用するには、問題3を解決する必要があります。

+0

ありがとう@ megabyte1024 – 0x41ndrea

関連する問題