Spark StreamingのKafkaメッセージからNiFi Flowfile属性にアクセスしようとしています。私は言語としてJavaを使用しています。KafkaConsumerのNiFi Flowfile属性
NiFIは、GetSFTPプロセッサを使用してFTPロケーションからバイナリファイルを読み取り、publishKafkaプロセッサを使用してbyte []メッセージをKafkaに発行するというシナリオです。これらのbyte []属性はSpark Streamingジョブを使用してASCIIデータに変換され、これらのデコードされたASCIIはKafkaに書き込まれ、NiFiプロセッサを使用してHDFSに保存されます。
私の問題は、バイナリファイル名とデコードされたASCIIファイルを追跡できないということです。デコードされたASCIIでヘッダーセクション(ファイル名、ファイルサイズ、レコード数など)を追加する必要がありますが、KafkaConsumerオブジェクトからNiFi Flowfileからファイル名にアクセスする方法を理解できません。標準のNiFiプロセッサを使用してこれを行う方法はありますか?または、この機能を実現するために他の提案を共有してください。ありがとう。
はいこれはデータフローです。私は、Kafka 0.11でメタデータ属性が利用できることを願っています。データのラッピングを検討します。 Spark Streamingの仕事に関しては、いくつかのイベントをトリガするウィンドウを実行します。私はNiFiを初めて使っています.NiFiでバイトをラップする方法を教えてください。 – Shahzad
最も簡単な方法は、GroovyまたはJythonのスクリプトを記述し、ExecuteScriptプロセッサを使用して実行することです。それはかなりではありませんが、フローファイルのバイトとBase64でそれらをエンコードし、{"data":、 "filename": "the-flow-file-name"}のようなJSONドキュメントを生成することができます。 –
https: /funnifi.blogspot.com/2016/02/executescript-processor-hello-world.html –