1

Spark StreamingのKafkaメッセージからNiFi Flowfile属性にアクセスしようとしています。私は言語としてJavaを使用しています。KafkaConsumerのNiFi Flowfile属性

NiFIは、GetSFTPプロセッサを使用してFTPロケーションからバイナリファイルを読み取り、publishKafkaプロセッサを使用してbyte []メッセージをKafkaに発行するというシナリオです。これらのbyte []属性はSpark Streamingジョブを使用してASCIIデータに変換され、これらのデコードされたASCIIはKafkaに書き込まれ、NiFiプロセッサを使用してHDFSに保存されます。

私の問題は、バイナリファイル名とデコードされたASCIIファイルを追跡できないということです。デコードされたASCIIでヘッダーセクション(ファイル名、ファイルサイズ、レコード数など)を追加する必要がありますが、KafkaConsumerオブジェクトからNiFi Flowfileからファイル名にアクセスする方法を理解できません。標準のNiFiプロセッサを使用してこれを行う方法はありますか?または、この機能を実現するために他の提案を共有してください。ありがとう。

答えて

1

だからあなたのデータフローは次のとおりです。

FTP - > NiFi - >カフカ - >スパークストリーミング - >カフカ - > NiFi - > HDFS ?

現在のところ、Kafkaは各メッセージにメタデータ属性を持っていません(ただし、これはKafka 0.11で発生する可能性があります).NiFiがトピックにメッセージを発行するとき、現在のところ、メッセージ。

元のコンテンツと必要な追加の属性を含むラッパーデータ形式(多分JSONまたはAvro)を構築しなければならないので、そのすべてをKafkaへの1つのメッセージのコンテンツとして公開することができます。

また、Sparkのストリーミングジョブで何をやっているのか正確にはわかりませんが、NiFiでその部分を行うだけの理由はありませんか?ウインドウやジョインを含む複雑なもののように聞こえるわけではないので、少し単純化してNiFiにデコードをさせてから、NiFiにKafkaとHDFSを書き込ませてもらいます。

+0

はいこれはデータフローです。私は、Kafka 0.11でメタデータ属性が利用できることを願っています。データのラッピングを検討します。 Spark Streamingの仕事に関しては、いくつかのイベントをトリガするウィンドウを実行します。私はNiFiを初めて使っています.NiFiでバイトをラップする方法を教えてください。 – Shahzad

+0

最も簡単な方法は、GroovyまたはJythonのスクリプトを記述し、ExecuteScriptプロセッサを使用して実行することです。それはかなりではありませんが、フローファイルのバイトとBase64でそれらをエンコードし、{"data":、 "filename": "the-flow-file-name"}のようなJSONドキュメントを生成することができます。 –

+0

https: /funnifi.blogspot.com/2016/02/executescript-processor-hello-world.html –

関連する問題