現在、RabbitMQメッセージバスからメッセージを取り出し、メッセージサイズ(バイト単位)を追加し、HDFSシンクを使用してメッセージを出力する必要があります。Spring Cloud Dataflow(rabbit | processor | hdfs)出力バイナリ
開始するには、メッセージにサイズを付加する独自のプロセッサを作成しました。私がこれをする理由は、エンコーディングがGoogleのプロトコルバッファである必要があるためです。
私のアプリは、次のようになります。
stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs "
HDFSシンクは、私は12768762 @ [Bを参照してくださいメッセージを出力します。私の周りGoogle'dしており、以下を追加するための推奨事項を見てきました:
spring.cloud.stream.bindings.input.consumer.headerMode=raw
しかし、これがすべてで私を助けるために見ていません!つまり、次のようにファイルを表示するようにアプリを変更した場合:
[input | processor ] | file --binary=true
これで問題なく動作します。しかし、私はHDFSシンクで提供されるロールオーバー機能が気に入っています。
アイデア?
はい、私が実装したプロセッサは、GPBのバイト配列を返します。あなたはそれがjava.io.Serializableを返さなければならないと言っていますか? –
私は "stream deploy --name rabbit-to-log --properties"を試しました。app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io。シリアライズ可能な ""は動作しませんでした。プロセッサがbyte []に対抗するjava.io.Serializableを返さなければならないと言っていますか? –
com.google.protobuf.GeneratedMessageV3から継承したエンベロープオブジェクトを返すようにプロセッサを書き換えました。このクラスはSerializableから継承します。新しいアプリケーションの登録を解除/登録してデータを処理すると、「contentType [application/x-java-object; type = CUAVProtos $ Envelope] CUAVProtos $ Envelope」を使用して[CUAVProtos $ Envelope]をデシリアライズできませんでした –