2016-06-21 18 views
3

protobufを使用して、クライアントとサーバの間でメッセージをやりとりしたいと考えています。私の場合は、サーバーからクライアントに任意の数のprotobufメッセージを送信したいと思います。これをC++ですばやく構築するにはどうすればよいですか?シンプルなC++ protobufストリーミングクライアント/サーバの作成

注:私はstackoverflowの上で本当に便利Kenton Varda answerFulkerson answerをプールした後、私の答えと一緒にこの質問を書きました。他の人たちも同様の質問をして、同様の障害を起こしている - herehere、およびhereを参照してください。

私はprotobufとasioで新しくなっていますので、改善を是正/提案したり、自分で答えを出すことができます。

答えて

6

最初に、C++ protobuf APIには、複数のprotobufメッセージを1つのストリーム/接続で送信するための組み込みサポートがありません。 Java APIにはありますが、まだC++バージョンには追加されていません。 Kenton Varda(protobuf v2の作成者)は、C++ versionを投稿するのに十分な大きさでした。したがって、単一の接続で複数のメッセージをサポートするには、そのコードが必要です。

次に、boost :: asioを使用してクライアント/サーバーを作成できます。 Do not asioが提供するistream/ostreamスタイルのインターフェイスを使用しようとします。 protobufで必要なストリームタイプ(ZeroCopyInputStream/ZeroCopyOutputStream)を作成するのは簡単ですが、動作しません。私は完全に理解していない理由は、this answer Fulkersonはそれをしようとするの脆弱な性質について話しています。また、生のソケットを必要な型に適合させるためのサンプルコードも提供しています。

これを基本的なboost :: asioチュートリアルと一緒にまとめると、クライアントとサーバーの後にサポートコードが続きます。 MyMessage.pb.hにあるpersistence :: MyMessageという単純なprotobufクラスの複数のインスタンスを送信しています。自分のものと交換してください。

クライアント:

#include <boost/asio.hpp> 
#include "ProtobufHelpers.h" 
#include "AsioAdapting.h" 
#include "MyMessage.pb.h" 
using boost::asio::ip::tcp; 
int main() 
{ 
    const char* hostname = "127.0.0.1"; 
    const char* port = "27015"; 
    boost::asio::io_service io_service; 
    tcp::resolver resolver(io_service); 
    tcp::resolver::query query(hostname, port); 
    tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); 
    tcp::socket socket(io_service); 
    boost::asio::connect(socket, endpoint_iterator); 
    AsioInputStream<tcp::socket> ais(socket); 
    CopyingInputStreamAdaptor cis_adp(&ais); 
    for (;;) 
    { 
     persistence::MyMessage myMessage; 
     google::protobuf::io::readDelimitedFrom(&cis_adp, &myMessage); 
    } 
    return 0; 
} 

サーバー:ここ

#include <boost/asio.hpp> 
#include "ProtobufHelpers.h" 
#include "AsioAdapting.h" 
#include "MyMessage.pb.h" 
using boost::asio::ip::tcp; 
int main() 
{ 
    boost::asio::io_service io_service; 
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 27015)); 
    for (;;) 
    { 
     tcp::socket socket(io_service); 
     acceptor.accept(socket); 
     AsioOutputStream<boost::asio::ip::tcp::socket> aos(socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket 
     CopyingOutputStreamAdaptor cos_adp(&aos); 
     int i = 0; 
     do { 
      ++i; 
      persistence::MyMessage myMessage; 
      myMessage.set_myString("hello world"); 
      myMessage.set_myInt(i); 
      google::protobuf::io::writeDelimitedTo(metricInfo, &cos_adp); 
      // Now we have to flush, otherwise the write to the socket won't happen until enough bytes accumulate 
      cos_adp.Flush(); 
     } while (true); 
    } 
    return 0; 
} 

はケントンヴァルダのサポートファイルの礼儀です:

ProtobufHelpers.h

#pragma once 
#include <google/protobuf/io/coded_stream.h> 
#include <google/protobuf/io/zero_copy_stream.h> 
#include <google/protobuf/message_lite.h> 
namespace google { 
    namespace protobuf { 
     namespace io { 
      bool writeDelimitedTo(
       const google::protobuf::MessageLite& message, 
       google::protobuf::io::ZeroCopyOutputStream* rawOutput); 

      bool readDelimitedFrom(
       google::protobuf::io::ZeroCopyInputStream* rawInput, 
       google::protobuf::MessageLite* message); 
     } 
    } 
} 

ProtobufHelpers.cpp

#include "ProtobufHelpers.h" 
namespace google { 
    namespace protobuf { 
     namespace io { 
      bool writeDelimitedTo(
       const google::protobuf::MessageLite& message, 
       google::protobuf::io::ZeroCopyOutputStream* rawOutput) { 
       // We create a new coded stream for each message. Don't worry, this is fast. 
       google::protobuf::io::CodedOutputStream output(rawOutput); 

       // Write the size. 
       const int size = message.ByteSize(); 
       output.WriteVarint32(size); 

       uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size); 
       if (buffer != NULL) { 
        // Optimization: The message fits in one buffer, so use the faster 
        // direct-to-array serialization path. 
        message.SerializeWithCachedSizesToArray(buffer); 
       } 
       else { 
        // Slightly-slower path when the message is multiple buffers. 
        message.SerializeWithCachedSizes(&output); 
        if (output.HadError()) return false; 
       } 

       return true; 
      } 

      bool readDelimitedFrom(
       google::protobuf::io::ZeroCopyInputStream* rawInput, 
       google::protobuf::MessageLite* message) { 
       // We create a new coded stream for each message. Don't worry, this is fast, 
       // and it makes sure the 64MB total size limit is imposed per-message rather 
       // than on the whole stream. (See the CodedInputStream interface for more 
       // info on this limit.) 
       google::protobuf::io::CodedInputStream input(rawInput); 

       // Read the size. 
       uint32_t size; 
       if (!input.ReadVarint32(&size)) return false; 

       // Tell the stream not to read beyond that size. 
       google::protobuf::io::CodedInputStream::Limit limit = 
        input.PushLimit(size); 

       // Parse the message. 
       if (!message->MergeFromCodedStream(&input)) return false; 
       if (!input.ConsumedEntireMessage()) return false; 

       // Release the limit. 
       input.PopLimit(limit); 

       return true; 
      } 
     } 
    } 
} 

とフルカーソンの礼儀:

AsioAdapting.h

#pragma once 
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> 

using namespace google::protobuf::io; 


template <typename SyncReadStream> 
class AsioInputStream : public CopyingInputStream { 
public: 
    AsioInputStream(SyncReadStream& sock); 
    int Read(void* buffer, int size); 
private: 
    SyncReadStream& m_Socket; 
}; 


template <typename SyncReadStream> 
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) : 
    m_Socket(sock) {} 


template <typename SyncReadStream> 
int 
AsioInputStream<SyncReadStream>::Read(void* buffer, int size) 
{ 
    std::size_t bytes_read; 
    boost::system::error_code ec; 
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec); 

    if (!ec) { 
     return bytes_read; 
    } 
    else if (ec == boost::asio::error::eof) { 
     return 0; 
    } 
    else { 
     return -1; 
    } 
} 


template <typename SyncWriteStream> 
class AsioOutputStream : public CopyingOutputStream { 
public: 
    AsioOutputStream(SyncWriteStream& sock); 
    bool Write(const void* buffer, int size); 
private: 
    SyncWriteStream& m_Socket; 
}; 


template <typename SyncWriteStream> 
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) : 
    m_Socket(sock) {} 


template <typename SyncWriteStream> 
bool 
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size) 
{ 
    boost::system::error_code ec; 
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec); 
    return !ec; 
} 
4

私はgRPCを使用してお勧めします。クライアントとサーバーが単一の論理要求の一部として時間の経過とともにいずれかの方向に複数のメッセージを送信できる「ストリーミング」要求をサポートしています。 gRPCを使用することで、多くの重要な設定があなたのために行われます。詳細なドキュメントとチュートリアルがあり、TLS暗号化が組み込まれています。クロスランゲージサポートがあり、新しい種類のリクエストとパラレルを簡単に追加できますストリームなど

関連する問題