最初に、C++ protobuf APIには、複数のprotobufメッセージを1つのストリーム/接続で送信するための組み込みサポートがありません。 Java APIにはありますが、まだC++バージョンには追加されていません。 Kenton Varda(protobuf v2の作成者)は、C++ versionを投稿するのに十分な大きさでした。したがって、単一の接続で複数のメッセージをサポートするには、そのコードが必要です。
次に、boost :: asioを使用してクライアント/サーバーを作成できます。 Do not asioが提供するistream/ostreamスタイルのインターフェイスを使用しようとします。 protobufで必要なストリームタイプ(ZeroCopyInputStream/ZeroCopyOutputStream)を作成するのは簡単ですが、動作しません。私は完全に理解していない理由は、this answer Fulkersonはそれをしようとするの脆弱な性質について話しています。また、生のソケットを必要な型に適合させるためのサンプルコードも提供しています。
これを基本的なboost :: asioチュートリアルと一緒にまとめると、クライアントとサーバーの後にサポートコードが続きます。 MyMessage.pb.hにあるpersistence :: MyMessageという単純なprotobufクラスの複数のインスタンスを送信しています。自分のものと交換してください。
クライアント:
#include <boost/asio.hpp>
#include "ProtobufHelpers.h"
#include "AsioAdapting.h"
#include "MyMessage.pb.h"
using boost::asio::ip::tcp;
int main()
{
const char* hostname = "127.0.0.1";
const char* port = "27015";
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(hostname, port);
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::socket socket(io_service);
boost::asio::connect(socket, endpoint_iterator);
AsioInputStream<tcp::socket> ais(socket);
CopyingInputStreamAdaptor cis_adp(&ais);
for (;;)
{
persistence::MyMessage myMessage;
google::protobuf::io::readDelimitedFrom(&cis_adp, &myMessage);
}
return 0;
}
サーバー:ここ
#include <boost/asio.hpp>
#include "ProtobufHelpers.h"
#include "AsioAdapting.h"
#include "MyMessage.pb.h"
using boost::asio::ip::tcp;
int main()
{
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 27015));
for (;;)
{
tcp::socket socket(io_service);
acceptor.accept(socket);
AsioOutputStream<boost::asio::ip::tcp::socket> aos(socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingOutputStreamAdaptor cos_adp(&aos);
int i = 0;
do {
++i;
persistence::MyMessage myMessage;
myMessage.set_myString("hello world");
myMessage.set_myInt(i);
google::protobuf::io::writeDelimitedTo(metricInfo, &cos_adp);
// Now we have to flush, otherwise the write to the socket won't happen until enough bytes accumulate
cos_adp.Flush();
} while (true);
}
return 0;
}
はケントンヴァルダのサポートファイルの礼儀です:
ProtobufHelpers.h
#pragma once
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/message_lite.h>
namespace google {
namespace protobuf {
namespace io {
bool writeDelimitedTo(
const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput);
bool readDelimitedFrom(
google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message);
}
}
}
と
ProtobufHelpers.cpp
#include "ProtobufHelpers.h"
namespace google {
namespace protobuf {
namespace io {
bool writeDelimitedTo(
const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
// We create a new coded stream for each message. Don't worry, this is fast.
google::protobuf::io::CodedOutputStream output(rawOutput);
// Write the size.
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL) {
// Optimization: The message fits in one buffer, so use the faster
// direct-to-array serialization path.
message.SerializeWithCachedSizesToArray(buffer);
}
else {
// Slightly-slower path when the message is multiple buffers.
message.SerializeWithCachedSizes(&output);
if (output.HadError()) return false;
}
return true;
}
bool readDelimitedFrom(
google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message) {
// We create a new coded stream for each message. Don't worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
}
}
}
}
とフルカーソンの礼儀:
AsioAdapting.h
#pragma once
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
using namespace google::protobuf::io;
template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
public:
AsioInputStream(SyncReadStream& sock);
int Read(void* buffer, int size);
private:
SyncReadStream& m_Socket;
};
template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
m_Socket(sock) {}
template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
std::size_t bytes_read;
boost::system::error_code ec;
bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);
if (!ec) {
return bytes_read;
}
else if (ec == boost::asio::error::eof) {
return 0;
}
else {
return -1;
}
}
template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
public:
AsioOutputStream(SyncWriteStream& sock);
bool Write(const void* buffer, int size);
private:
SyncWriteStream& m_Socket;
};
template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
m_Socket(sock) {}
template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{
boost::system::error_code ec;
m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
return !ec;
}