2013-03-15 20 views
6

私のアプリケーションでは、async boost udp/tcpソケット操作と同期を使用して評価しようとしています。私はデザインに似た例を見つけようとしていますが、正しいパスではないにしても、私のデザインに非同期オペレーションを適合させようとしているかもしれないと私に信じさせてくれたものは見つかりませんでした。Boost asio - 異なるサーバーへの複数のクライアント接続

複数の(読み取り:1〜10の)サーバーに接続し、それぞれ異なるプロトコルを使用して通信したいと考えています。私はこれらのサーバ接続のいずれかに通信する必要のあるデータを生成する4-5のスレッドを持っています。

私の現在の設計は同期的で、サーバー接続スレッドごとにio_serviceオブジェクトを使用し、次に生成スレッドと各接続スレッドの間にスレッドセーフキューを使用します。

このデザインはスループットパフォーマンスの点でスケーラブルではないようですが、これは最大限に活用したいものです。

この複数の接続をさまざまなサーバーパターンに提供する例はありますか。

答えて

2

ASIOで実装されたTCP/IP SSL/TLSを使用して6つの異なるサーバーに接続するクライアントを作成しました。 6つはすべて同じプロトコルを使用します。

SSLSocket.H

#pragma once 

#include <cstdlib> 
#include <iostream> 
#include <queue> 
#include <boost/bind.hpp> 
#include <boost/asio.hpp> 
#include <boost/asio/ssl.hpp> 
#include <boost/thread.hpp> 
#include <boost/thread/mutex.hpp> 
#include <boost/shared_ptr.hpp> 
using namespace std; 
// 
#include "BufferManagement.h" 
#include "Logger.h" 
#include "Common Classes\Locking.h" 
#include "Message.h" 

class SSLSocket; 
class ConcurrentMsgQueue; 

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 

typedef void (__stdcall *Callback)(const SSLSocket* pSSLS, const int bytesInMsg, const void* pBuf); 

// typedef std::vector<boost::asio::ssl::stream<boost::asio::ip::tcp::socket> SocketVectorType; 

enum {MsgLenBytes = 4}; 

class SSLSocket 
{ 
    // This class handles all communications between the client and the server 
    // using TCP/IP SSL v1. The Boost ASIO (Asynchronous I/O) library is used to accomplish this. 
    // Initally written by Bob Bryan on 1/21/2013. 
    // 
public: 
    SSLSocket(const bool logToFile, const bool logToConsole, const bool displayInHex, const LogLevel levelOfLog, const string& logFileName, const int bufMangLen); 
    ~SSLSocket(); 
    void Connect(SSLSocket* psSLS, const string& serverPath, string& port); 
    void SendToServer(const int bytesInMsg, Byte* pBuf); 
    void Stop(); 

    static void SetCallback(Callback callbackFunction) 
    { 
     // This method is required in order to be able to do a reverse pinvoke from C#. 
     // This callback function pointer is what is used to communicate back to the C# code. 
     CallbackFunction = callbackFunction; 
    } 

    static Byte* AllocateMem(int length) 
    { 
     // Allocate some memory. This method winds up getting called when the C# client needs to allocate some memory for a message. 
     Byte* pBuf = BufMang.GetPtr(length); 
     return pBuf; 
    } 
    // 
    static Logger Log; // Object used to log info to a file and/or to the console. 
    static Callback CallbackFunction; // Callback function object used to communicate with the worker thread in C#. 

private: 
    void InitAsynchIO(); 
    void HandleConnect(const boost::system::error_code& error); 
    void HandleHandshake(const boost::system::error_code& error); 
    void HandleFirstWrite(const boost::system::error_code& error, size_t bytes_transferred); 
    void HandleRead(const boost::system::error_code& error, size_t bytesTransferred); 
    // void HandleRead(const boost::system::error_code& error, size_t bytes_transferred); 
    void Terminate(); 
    void static RcvWorkerThread(SSLSocket* sSLS); 
    void static SendWorkerThread(SSLSocket* psSLS); 
    void ProcessSendRequests(); 
    void HandleWrite(const boost::system::error_code& error, size_t bytesTransferred); 
    static void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service); 
    // 
    struct Bytes 
    { 
     // Used to convert 4 bytes to an int. 
     unsigned char B1; 
     unsigned char B2; 
     unsigned char B3; 
     unsigned char B4; 
    }; 

    union Bytes4ToInt 
    { 
     // Converts 4 bytes to an int. 
     int IntVal; 
     Bytes B; 
    }; 

    inline int BytesToInt(const Byte * pBuf) 
    { 
     // This method converts 4 bytes from an array of bytes to a 4-byte int. 
     B2I.B.B1 = *pBuf++; 
     B2I.B.B2 = *pBuf++; 
     B2I.B.B3 = *pBuf++; 
     B2I.B.B4 = *pBuf; 
     int Value = B2I.IntVal; 
     return Value; 
    } 
    // 
    boost::thread_group WorkerThreads; // Used to handle creating threads. 
    CRITICAL_SECTION SocketLock; // Used in conjuction with the Locking object to handle single threading the code. 
    boost::asio::ssl::stream<boost::asio::ip::tcp::socket>* pSocket; // Pointer to the socket object. 
    Bytes4ToInt B2I; // Used to translate 4 bytes in the buffer to an int representing the number of bytes in the msg. 
    std::string sClientIp; // Client IP address. Used for logging. 
    unsigned short uiClientPort; // Port number. Used for logging. 
    // static MessageList* pRepMsgs; // Link list of the msgs to send to the server. 
    Byte* pDataBuf; // Pointer to the data for the current message to be read. 
    static boost::shared_ptr<boost::asio::io_service> IOService; // Object required for use by ASIO to perform certain functions. 
    static bool RcvThreadCreated; // Set when the rcv thread is created so that it won't try to create it again. 
    static int StaticInit; // Indicates whether or not the static members have been initialized or not. 
    static bool DisplayInHex; // Specifies whether to display a buffer in hex or not. 
    static BufferManagement BufMang; // Smart pointer to the buffer used to handle requests coming to and from the server for all sockets. 
    volatile static bool ReqAlive; // Used to indicate whether the request thread should die or not. 
    // static bool RepAlive; // Used to indicate whether the response thread should die or not. 
    static ConcurrentMsgQueue SendMsgQ; // Holds the messages waiting to be sent to the server. 
    static HANDLE hEvent; // Used for signalling between threads. 
}; 

ので、ここでのキーポイントである

#include "StdAfx.h" 
#include "SSLSocket.h" 

boost::shared_ptr<boost::asio::io_service> SSLSocket::IOService; 
int SSLSocket::StaticInit = 0; 
Callback SSLSocket::CallbackFunction; 
BufferManagement SSLSocket::BufMang; 
volatile bool SSLSocket::ReqAlive = true; 
Logger SSLSocket::Log; 
HANDLE SSLSocket::hEvent; 
bool SSLSocket::DisplayInHex; 
ConcurrentMsgQueue SSLSocket::SendMsgQ; 
bool SSLSocket::RcvThreadCreated = 0; 
BufferManagement* Message::pBufMang; 

SSLSocket::SSLSocket(const bool logToFile, const bool logToConsole, const bool displayInHex, 
    const LogLevel levelOfLog, const string& logFileName, const int bufMangLen) : pSocket(0) 
{ 
    // SSLSocket Constructor. 
    // If the static members have not been intialized yet, then initialize them. 
    if (!StaticInit) 
    { 
     DisplayInHex = displayInHex; 
     BufMang.Init(bufMangLen); 
     Message::SetBufMang(&BufMang); 
     // This constructor enables logging according to the vars passed in. 
     Log.Init(logToFile, logToConsole, levelOfLog, logFileName); 
     // Create the crit section object 
     // Locking::InitLocking(ReadLock); 
     // Locking::InitLocking(WriteLock); 
     StaticInit++; 
     hEvent = CreateEvent(NULL, false, false, NULL); 
     // Define the ASIO IO service object. 
     // IOService = new boost::shared_ptr<boost::asio::io_service>(new boost::asio::io_service); 
     boost::shared_ptr<boost::asio::io_service> IOServ(new boost::asio::io_service); 
     IOService = IOServ; 
    } 
} 

SSLSocket::~SSLSocket(void) 
{ 
    delete pSocket; 
    if (--StaticInit == 0) 
     CloseHandle(hEvent); 
} 

void SSLSocket::Connect(SSLSocket* psSLS, const string& serverPath, string& port) 
{ 
    // Connects to the server. 
    // serverPath - specifies the path to the server. Can be either an ip address or url. 
    // port - port server is listening on. 
    // 
    try 
    { 
     Locking CodeLock(SocketLock); // Single thread the code. 
     // If the user has tried to connect before, then make sure everything is clean before trying to do so again. 
     if (pSocket) 
     { 
     delete pSocket; 
     pSocket = 0; 
     }                         
     // If serverPath is a URL, then resolve the address. 
     // Note that this code expects the first server to always have a url. 
     if ((serverPath[0] < '0') || (serverPath[0] > '9')) // Assumes that the first char of the server path is not a number when resolving to an ip addr. 
     { 
     // Create the resolver and query objects to resolve the host name in serverPath to an ip address. 
     boost::asio::ip::tcp::resolver resolver(*IOService); 
     boost::asio::ip::tcp::resolver::query query(serverPath, port); 
     boost::asio::ip::tcp::resolver::iterator EndpointIterator = resolver.resolve(query); 
     // Set up an SSL context. 
     boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client); 
     // Specify to not verify the server certificiate right now. 
     ctx.set_verify_mode(boost::asio::ssl::context::verify_none); 
     // Init the socket object used to initially communicate with the server. 
     pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx); 
     // 
     // The thread we are on now, is most likely the user interface thread. Create a thread to handle all incoming socket work messages. 
     if (!RcvThreadCreated) 
     { 
      WorkerThreads.create_thread(boost::bind(&SSLSocket::RcvWorkerThread, this)); 
      RcvThreadCreated = true; 
      WorkerThreads.create_thread(boost::bind(&SSLSocket::SendWorkerThread, this)); 
     } 
     // Try to connect to the server. Note - add timeout logic at some point. 
     boost::asio::async_connect(pSocket->lowest_layer(), EndpointIterator, 
      boost::bind(&SSLSocket::HandleConnect, this, boost::asio::placeholders::error)); 
     } 
     else 
     { 
     // serverPath is an ip address, so try to connect using that. 
     // 
     // Create an endpoint with the specified ip address. 
     const boost::asio::ip::address IP(boost::asio::ip::address::from_string(serverPath)); 
     int iport = atoi(port.c_str()); 
     const boost::asio::ip::tcp::endpoint EP(IP, iport); 
     // Set up an SSL context. 
     boost::asio::ssl::context ctx(*IOService, boost::asio::ssl::context::tlsv1_client); 
     // Specify to not verify the server certificiate right now. 
     ctx.set_verify_mode(boost::asio::ssl::context::verify_none); 
     // Init the socket object used to initially communicate with the server. 
     pSocket = new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(*IOService, ctx); 
     // 
     // Try to connect to the server. Note - add timeout logic at some point. 
     //pSocket->core_.engine_.do_connect(void*, int); 
     // pSocket->next_layer_.async_connect(EP, &SSLSocket::HandleConnect) 
     // pSocket->next_layer().async_connect(EP, &SSLSocket::HandleConnect); 
     boost::system::error_code EC; 
     pSocket->next_layer().connect(EP, EC); 
     if (EC) 
     { 
      // Log an error. This worker thread should exit gracefully after this. 
      stringstream ss; 
      ss << "SSLSocket::Connect: connect failed to " << sClientIp << " : " << uiClientPort << ". Error: " << EC.message() + ".\n"; 
      Log.LogString(ss.str(), LogError); 
     } 
     HandleConnect(EC); 
     // boost::asio::async_connect(pSocket->lowest_layer(), EP, 
     // boost::bind(&SSLSocket::HandleConnect, this, boost::asio::placeholders::error)); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::Connect: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::SendToServer(const int bytesInMsg, Byte* pBuf) 
{ 
    // This method creates a msg object and saves it in the SendMsgQ object. 
    // sends the number of bytes specified by bytesInMsg in pBuf to the server. 
    // 
    Message* pMsg = Message::GetMsg(this, bytesInMsg, pBuf); 
    SendMsgQ.Push(pMsg); 
    // Signal the send worker thread to wake up and send the msg to the server. 
    SetEvent(hEvent); 
} 


void SSLSocket::SendWorkerThread(SSLSocket* psSLS) 
{ 
    // This thread method that gets called to process the messages to be sent to the server. 
    // 
    // Since this has to be a static method, call a method on the class to handle server requests. 
    psSLS->ProcessSendRequests(); 
} 

void SSLSocket::ProcessSendRequests() 
{ 
    // This method handles sending msgs to the server. 
    // 
    std::stringstream ss; 
    DWORD WaitResult; 
    Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " started.\n", LogInfo); 
    // Loop until the user quits, or an error of some sort is thrown. 
    try 
    { 
     do 
     { 
     // If there are one or more msgs that need to be sent to a server, then send them out. 
     if (SendMsgQ.Count() > 0) 
     { 
      Message* pMsg = SendMsgQ.Front(); 
      SSLSocket* pSSL = pMsg->pSSL; 
      SendMsgQ.Pop(); 
      const Byte* pBuf = pMsg->pBuf; 
      const int BytesInMsg = pMsg->BytesInMsg; 
      boost::system::error_code Error; 
      { 
       Locking CodeLock(SocketLock); // Single thread the code. 
       boost::asio::async_write(*pSSL->pSocket, boost::asio::buffer(pBuf, BytesInMsg), boost::bind(&SSLSocket::HandleWrite, this, 
        boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
      } 
      ss << "SSLSocket::ProcessSendRequests: # bytes sent = " << BytesInMsg << "\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      Log.LogBuf(pBuf, BytesInMsg, DisplayInHex, LogDebug3); 
     } 
     else 
     { 
      // Nothing to send, so go into a wait state. 
      WaitResult = WaitForSingleObject(hEvent, INFINITE); 
      if (WaitResult != 0L) 
      { 
       Log.LogString("SSLSocket::ProcessSendRequests: WaitForSingleObject event error. Code = " + Logger::NumberToString(GetLastError()) + ". \n", LogError); 
      } 
     } 
     } while (ReqAlive); 
     Log.LogString("SSLSocket::ProcessSendRequests: Worker thread " + Logger::NumberToString(boost::this_thread::get_id()) + " done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::ProcessSendRequests: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleWrite(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called after a msg has been written out to the socket. Nothing to do really since reading is handled by the HandleRead method. 

    std::stringstream ss; 
    try 
    { 
     if (error) 
     { 
     ss << "SSLSocket::HandleWrite: failed - " << error.message() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::RcvWorkerThread(SSLSocket* psSLS) 
{ 
    // This is the method that gets called when the receive thread is created by this class. 
    // This thread method focuses on processing messages received from the server. 
    // 
    // Since this has to be a static method, call a method on the class to handle server requests. 
    psSLS->InitAsynchIO(); 
} 

void SSLSocket::InitAsynchIO() 
{ 
    // This method is responsible for initiating asynch i/o. 
    boost::system::error_code Err; 
    string s; 
    stringstream ss; 
    // 
    try 
    { 
     ss << "SSLSocket::InitAsynchIO: Worker thread - " << Logger::NumberToString(boost::this_thread::get_id()) << " started.\n"; 
     Log.LogString(ss.str(), LogInfo); 
     // Enable the handlers for asynch i/o. The thread will hang here until the stop method has been called or an error occurs. 
     // Add a work object so the thread will be dedicated to handling asynch i/o. 
     boost::asio::io_service::work work(*IOService); 
     IOService->run(); 
     Log.LogString("SSLSocket::InitAsynchIO: receive worker thread done.\n", LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleConnect(const boost::system::error_code& error) 
{ 
    // This method is called asynchronously when the server has responded to the connect request. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     pSocket->async_handshake(boost::asio::ssl::stream_base::client, 
      boost::bind(&SSLSocket::HandleHandshake, this, boost::asio::placeholders::error)); 
     ss << "SSLSocket::HandleConnect: From worker thread " << Logger::NumberToString(boost::this_thread::get_id()) << ".\n"; 
     Log.LogString(ss.str(), LogInfo); 
     } 
     else 
     { 
     // Log an error. This worker thread should exit gracefully after this. 
     ss << "SSLSocket::HandleConnect: connect failed to " << sClientIp << " : " << uiClientPort << ". Error: " << error.message() + ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::InitAsynchIO: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleHandshake(const boost::system::error_code& error) 
{ 
    // This method is called asynchronously when the server has responded to the handshake request. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     // Try to send the first message that the server is expecting. This msg tells the server we want to start communicating. 
     // This is the only msg specified in the C++ code. All other msg processing is done in the C# code. 
     // 
     unsigned char Msg[27] = {0x17, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x41, 
      0x74, 0x74, 0x61, 0x63, 0x6b, 0x50, 0x6f, 0x6b, 0x65, 0x72, 0x02, 0x00, 0x65, 0x6e}; 
     boost::system::error_code Err; 

     sClientIp = pSocket->lowest_layer().remote_endpoint().address().to_string(); 
     uiClientPort = pSocket->lowest_layer().remote_endpoint().port(); 
     ReqAlive = true; 
     // boost::asio::async_write(*pSocket, boost::asio::buffer(Msg), boost::bind(&SSLSocket::HandleFirstWrite, this, 
     // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     int Count = boost::asio::write(*pSocket, boost::asio::buffer(Msg), boost::asio::transfer_exactly(27), Err); 
     if (Err) 
     { 
      ss << "SSLSocket::HandleHandshake: write failed - " << error.message() << ".\n"; 
      Log.LogString(ss.str(), LogInfo); 
     } 
     HandleFirstWrite(Err, Count); 
     // boost::asio::async_write(pSocket, boost::asio::buffer(Msg, 27), boost::bind(&SSLSocket::HandleWrite, this, 
     // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     ss.str(""); 
     ss << "SSLSocket::HandleHandshake: From worker thread " << boost::this_thread::get_id() << ".\n"; 
     } 
     else 
     { 
     ss << "SSLSocket::HandleHandshake: failed - " << error.message() << ".\n"; 
     IOService->stop(); 
     } 
     Log.LogString(ss.str(), LogInfo); 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleHandshake: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleFirstWrite(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called after a msg has been written out to the socket. 
    std::stringstream ss; 
    try 
    { 
     if (!error) 
     { 
     // boost::asio::async_read(pSocket, boost::asio::buffer(reply_, bytesTransferred), boost::bind(&SSLSocket::handle_read, 
     // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // boost::asio::async_read(pSocket, boost::asio::buffer(reply_, 84), boost::bind(&SSLSocket::handle_read, 
     // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // Locking CodeLock(ReadLock); // Single thread the code. 
     // Signal the other threads that msgs are now ready to be sent and received. 
     // boost::asio::async_read(pSocket, boost::asio::buffer(pRepBuf), boost::asio::transfer_exactly(4), boost::bind(&SSLSocket::HandleRead, 
     // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // 
     // Notify the UI that we are now connected. Create a 6 byte msg for this. 
     pDataBuf = BufMang.GetPtr(6); 
     BYTE* p = pDataBuf; 
     // Create msg type 500 
     *p = 244; 
     *++p = 1; 
     CallbackFunction(this, 2, (void*)pDataBuf); 
     // Get the 1st 4 bytes of the next msg, which is always the length of the that msg. 
     pDataBuf = BufMang.GetPtr(MsgLenBytes); 

     // int i1=1,i2=2,i3=3,i4=4,i5=5,i6=6,i7=7,i8=8,i9=9; 
     // (boost::bind(&nine_arguments,_9,_2,_1,_6,_3,_8,_4,_5,_7)) 
     //  (i1,i2,i3,i4,i5,i6,i7,i8,i9); 

     // boost::asio::read(*pSocket, boost::asio::buffer(pReqBuf, MsgLenBytes), boost::asio::transfer_exactly(MsgLenBytes), Err); 
     // boost::asio::async_read(pSocket, boost::asio::buffer(pReqBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleRead, _1,_2,_3)) 
     // (this, pReqBuf, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred); 
     // boost::asio::async_read(*pSocket, boost::asio::buffer(reply_), boost::asio::transfer_exactly(ByteCount), boost::bind(&Client::handle_read, 
     //  this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // boost::asio::async_write(*pSocket, boost::asio::buffer(pDataBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleWrite, this, 
     // boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 

     Locking CodeLock(SocketLock); // Single thread the code. 
     boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, MsgLenBytes), boost::bind(&SSLSocket::HandleRead, this, 
      boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     } 
     else 
     { 
     ss << "SSLSocket::HandleFirstWrite: failed - " << error.message() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleFirstWrite: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::HandleRead(const boost::system::error_code& error, size_t bytesTransferred) 
{ 
    // This method is called to process an incomming message. 
    // 
    std::stringstream ss; 
    int ByteCount; 
    try 
    { 
     ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << ".\n"; 
     Log.LogString(ss.str(), LogInfo); 
     // Set to exit this thread if the user is done. 
     if (!ReqAlive) 
     { 
     // IOService->stop(); 
     return; 
     } 
     if (!error) 
     { 
     // Get the number of bytes in the message. 
     if (bytesTransferred == 4) 
     { 
      ByteCount = BytesToInt(pDataBuf); 
     } 
     else 
     { 
      // Call the C# callback method that will handle the message. 
      ss << "SSLSocket::HandleRead: From worker thread " << boost::this_thread::get_id() << "; # bytes transferred = " << bytesTransferred << ".\n"; 
      Log.LogString(ss.str(), LogDebug2); 
      Log.LogBuf(pDataBuf, (int)bytesTransferred, true, LogDebug3); 
      Log.LogString("SSLSocket::HandleRead: sending msg to the C# client.\n\n", LogDebug2); 
      CallbackFunction(this, bytesTransferred, (void*)pDataBuf); 
      // Prepare to read in the next message length. 
      ByteCount = MsgLenBytes; 
     } 
     pDataBuf = BufMang.GetPtr(ByteCount); 
     boost::system::error_code Err; 
     // boost::asio::async_read(pSocket, boost::asio::buffer(pDataBuf, ByteCount), boost::bind(&SSLSocket::HandleRead, 
      // this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     Locking CodeLock(SocketLock); // Single thread the code. 
     boost::asio::async_read(*pSocket, boost::asio::buffer(pDataBuf, ByteCount), boost::bind(&SSLSocket::HandleRead, 
      this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
     // boost::asio::read(pSocket, boost::asio::buffer(reply_), boost::asio::transfer_exactly(ByteCount), Err); 
     } 
     else 
     { 
     Log.LogString("SSLSocket::HandleRead failed: " + error.message() + "\n", LogError); 
     Stop(); 
     } 
    } 
    catch (std::exception& e) 
    { 
     stringstream ss; 
     ss << "SSLSocket::HandleRead: threw an error - " << e.what() << ".\n"; 
     Log.LogString(ss.str(), LogError); 
     Stop(); 
    } 
} 

void SSLSocket::Stop() 
{ 
    // This method calls the shutdown method on the socket in order to stop reads or writes that might be going on. If this is not done, then an exception will be thrown 
    // when it comes time to delete this object. 
    ReqAlive = false; 
    SetEvent(hEvent); 
    IOService->stop(); 
} 

SSLSocket.cpp:それは場合に役立ちますので、ここに私のコードです

  1. serに接続する場合verを初めて使用すると、SSLSocketクラスの新しいインスタンスが作成されます。 io_serviceオブジェクトは静的で、1回だけ作成されます。 SSLSocketクラスの6つのインスタンスすべてで使用されます。

  2. 6つのサーバーすべてでソケット通信に関係するすべての処理に2つのスレッドが使用されています。 1つのスレッドは、サーバーから受信したメッセージを処理するためのものです。もう一方のスレッドは、サーバーにメッセージを送信するために使用されます。

  3. このコードでは、SSL/TSLを使用しています。ストレートTCPを使用している場合は、SSLSocket :: Connectメソッドとssl #include行の3行を削除するだけです。

  4. HandleReadで使用される手法は、ダブル・リード・メソッドを使用します。最初の読み取りはバイト数を取得し(プロトコルは最初の4バイトをメッセージ長として使用するため)、2番目の読み取りはそのメッセージの合計バイト数を取得します。これは、ソケットからデータを読み取ることを処理する最も効率的で、最も望ましい方法ではないかもしれません。しかし、理解するのが最も簡単で簡単です。プロトコルが異なり、メッセージのサイズがはるかに大きく、メッセージ全体が受信される前にメッセージの処理を開始できる場合は、別の方法を使用することを検討することもできます。

  5. このコードでは、Visual Studio 2008 for WindowsでBoost 1.52.0を使用しています。

+0

ありがとう、これは私にそれを行う方法のアイデアを与えてくれました.Samは、より多くの接続があるまで、非同期ルートに行くことは実り得ないかもしれないと述べました。私はこれをバックポケットに入れておきます。 – RishiD

+0

一見このコードは間違っているようですが、 'CodeLock(SocketLock)をロックする。 boost :: asio :: async_write(...); '?このコメントは、これがシングルスレッドであることを示すようです。ミューテックスはここで十分ではありません。なぜなら、アプリケーションは、ストリームごとに最大で1回の書き込み操作を確実に実行する必要があるからです。 –

+0

私はこの問題を最初に解消すると、使用されるスレッドの数が不明で、マルチプルスレッドが同じソケットオブジェクトを使用しようとしている可能性があると心配していました。私は後で、読み取りのための単一のスレッドと書き込みのための単一のスレッドを使用するようにコードを洗練しましたが、ロックに残しました。私はおそらくそれらを取る必要があります。 「ストリームごとに最大で1つの書き込み操作が確実に実行される必要があるため、ここではmutexで十分ではありません。」とはどういう意味ですか? ?代わりにあなたは何をしますか? HTTP Server 3の例を見てきましたか? –

1

Asio examplesに含まれる1対多数のクライアント/サーバー設計の直接の例はありません。設計が最大10個の接続で固定されている場合は、スレッドごとに同期通信を使用しても問題ありません。しかし、これ以上の拡張を予定している場合は、数百または数千のスレッドを作成することによる利益の減少が分かります。

つまり、async_connectasync_readおよびasync_writeを組み合わせて使用​​することは、理解または実装するのが難しくありません。私はこの同じ概念を使って、ほんの一握りのスレッドを使ってworld's fastest supercomputerの数千の接続を管理しました。 async TCP client exampleは、このルートを選択した場合には、おそらく最良のものです。

単に例以外のものをお探しの場合は、有用であるかもしれないAsioを使用しているものがあります。open source projects

+0

入力していただきありがとうございます。まだ私の脳を非同期通信で囲んでいると思います。私はサーバー側の値を理解していますが、クライアント側の値を見ることができませんでした。 – RishiD

関連する問題