2016-04-22 13 views
0

protobufでエンコードされたメッセージを使用して、kafka/samzaジョブのパイプラインを処理します。特定のデータセットではパイプラインがかなり長くなる可能性があります。パイプラインの各ステージにタイムスタンプ/ IDを追加して、効率とサービスの状態を監視する必要があります。シリアル化されたメッセージを別のprotobufメッセージに挿入できますか?

追加情報は、touchpointsというスキーマの繰り返しフィールドに追加されます。明らかに、java/samzaでメッセージをデコードすると、追加のメッセージを追加してもう一度シリアライズすると、メッセージのサイズとともに増加するオーバーヘッドが発生します(いくつかの場合、デシリアライズ時間が長くなります)。パイプの一部は、キーをデシリアライズしなくてもよいため、これらのオーバーヘッドが少なくて済みます。

デシリアライズせずに既存のメッセージに2番目のシリアライズされたメッセージを挿入することは可能ですか?そうすることは非常に悪い習慣になるでしょう(私は思うにすぎません)。メッセージパス/時刻を監視するための逆シリアル化/追加/シリアル化

答えて

2

一般的に、これは非常に扱いにくく、次の理由で「ストリーミング」の方法では実行できません:子メッセージには、可変長整数で表現する。したがって、何かを挿入すると、すべての親のサイズをルートまで再帰的に調整することになり、サイズの変更は、サイズの可変長エンコーディングのためにコンテンツを再度移動させる可能性があります。

この問題を回避するには、タイムスタンプに固定サイズのフィールドを使用し、最初の段階でプロトをビルドするときに値が埋め込まれていることを確認してください。 protoの対応するスペース。これにより、CodedInputStreamを使用する(理想的にはユニークな)タイムスタンプフィールドIDのプロトタイプをスキャンし、CodedOutputStreamを使用してパッチされたストリームを戻すことができます。これを正しく行うには、依然として内部形式を理解する必要があります。最初に空のパストラフ「フィルタ」を使い、出力が入力と一致することを確認することをお勧めします(問題があれば質問を更新してください)

+0

メッセージサイズを忘れてしまいますいくらか問題です。メッセージが持つ可能性のあるステージの数がわからないため、固定値が有効かどうかはわかりません。将来的にはlibのための良い将来のプロジェクトを作りますが、将来私はおそらくそれに戻ってくるでしょう。情報のための乾杯! –

関連する問題