2017-09-22 21 views
0

現在、RabbitMQメッセージバスからメッセージを取り出し、メッセージサイズ(バイト単位)を追加し、HDFSシンクを使用してメッセージを出力する必要があります。Spring Cloud Dataflow(rabbit | processor | hdfs)出力バイナリ

開始するには、メッセージにサイズを付加する独自のプロセッサを作成しました。私がこれをする理由は、エンコーディングがGoogleのプロトコルバッファである必要があるためです。

私のアプリは、次のようになります。

stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs " 

HDFSシンクは、私は12768762 @ [Bを参照してくださいメッセージを出力します。私の周りGoogle'dしており、以下を追加するための推奨事項を見てきました:

spring.cloud.stream.bindings.input.consumer.headerMode=raw 

しかし、これがすべてで私を助けるために見ていません!つまり、次のようにファイルを表示するようにアプリを変更した場合:

[input | processor ] | file --binary=true 

これで問題なく動作します。しかし、私はHDFSシンクで提供されるロールオーバー機能が気に入っています。

アイデア?

答えて

0

ファイルは、受信したバイトをダンプするだけなので、HDFSシンクを見ると、入力としてjava.io.Serializableオブジェクトを消費する必要があるようです。しかし、あなたの場合は、protobufオブジェクトからバイト配列を送信しています(私はそれが起こっていると仮定しています)

+0

はい、私が実装したプロセッサは、GPBのバイト配列を返します。あなたはそれがjava.io.Serializableを返さなければならないと言っていますか? –

+0

私は "stream deploy --name rabbit-to-log --properties"を試しました。app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io。シリアライズ可能な ""は動作しませんでした。プロセッサがbyte []に​​対抗するjava.io.Serializableを返さなければならないと言っていますか? –

+0

com.google.protobuf.GeneratedMessageV3から継承したエンベロープオブジェクトを返すようにプロセッサを書き換えました。このクラスはSerializableから継承します。新しいアプリケーションの登録を解除/登録してデータを処理すると、「contentType [application/x-java-object; type = CUAVProtos $ Envelope] CUAVProtos $ Envelope」を使用して[CUAVProtos $ Envelope]をデシリアライズできませんでした –

0

型は互換性がありません。そのcontentTypeをSCSに設定することによって、java直列化を使用してwriteObjectを呼び出すようにフレームワークに要求するだけです。しかし、すでにシリアル化フレームワークであるprotobufを使用しているので、動作しません。ここで問題となるのは、シンクはシンクロコードを期待する(シンクコードに慣れていない)ようですが、シンクロナイズはできません。あなたができることは、シンクアプリを修正するか、またはprotobufからSerializableに変換する方法を知っているカスタムコンバータを提供することです。それが正直であると分かるかどうかは分かりません。

関連する問題