2017-05-17 22 views
2

NiFiに新たに追加!イベント通知機能としてNiFiフローファイルを使用する

NiFiのフローファイルに属性を持つ空のフローファイルを送信する方法があるのでしょうか?私はイベントの種類が開始したことを示すトリガとしてこれを使用したいと思います。

NiFiでは、一連のイベントが開始され、終了したことを示す他の方法はありますか?例えば、私がデータを読み込む3つのプロセッサを持っていて、最初のプロセッサが起動されようとしていて、最後のプロセッサが終了したことを知りたいのですが。とにかくこれをするために私にはありますか?プロセッサーが稼動を続けている場合は、プロセッサー1からプロセッサー3に読み込んだデータを1回のパスでグループ化できるようにしたいと考えています。事前に おかげで、

Begin 
Processor1 
Processor2 
Processor3 
End 
Begin 
Processor1 
Processor2 
Processor3 
End 
... 

任意の助けをいただければ幸い、これはより明確にするために!

答えて

8

ここでは多くのことが起こっているので、私はこの回答をいくつかの部分に分けるつもりです。

属性を持つ空のフローファイルをNiFiのフローファイルに送信する方法があるのでしょうか?私はこれをトリガーとして使用して、 というイベントの種類が「開始済み」であることを示したいと思います。

プロセッサGenerateFlowFileプロセッサでは、空の(または入力された)フローファイルを定期的な実行スケジュールまたはCRONスケジューリングで送信できます。これをプロセッサーUpdateAttributeと組み合わせて、任意の静的または動的属性をフローファイルに追加することができます。

NiFiでは、イベントセット が開始され、終了したことを示す他の方法はありますか?たとえば、私は3つのプロセッサ がデータを読み込んでいて、最初のプロセッサが起動しようとしていて、最後のプロセッサが終了していることを知りたいと思っています。とにかくこれを行うために私には ですか?

これは、Apache NiFiが設計や最適化されていないバッチ処理に近づいています。 のソースプロセッサを決定するには、「起動しようとしています」が非常に困難です。そのプロセッサがタイマー/ CRONベースで起動されている場合は、そのタイミングを知ることができますが、「GetFileがファイルを正常に取得しようとしている」という意味ならば、それは簡単に実行できません。独自のカスタマープロセッサーでプロセッサーを拡張し、onTrigger()メソッドをオーバーライドして、DistributedMapCacheClientServiceに別のプロセッサーが対応できる値を保管することができます。あるいは、ロジックをExecuteScriptプロセッサでラップし、カスタム通知コードを書くことができます。私はターゲットここに - はこの状態の変化を通知されますかわからないですか?それは別のプロセッサ、人間オブザーバー、または外部サービスですか?

プロセッサが稼働し続ける場合は、 を1回の処理でプロセッサ1からプロセッサ3に読み取ったデータをグループ化することができます。 このより明確

Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2 Processor3 End ...

を作るためにしかし、私が何を求めていることは、新たなWaitNotifyプロセッサの使用で可能であると考えています。川村浩司は、その使用方法を記述した良い記事を書いていますhere

このケースでは、一度に1つのデータ単位でない限り、システムを経由してくるバッチを検出できるように特別なコンテンツまたは属性が必要だと思います。私は以下の2つのシナリオを説明しようとしますが、私はこれに関する多くのコンテキストを持っていません。

シナリオ1(データのシングルユニット)

異なるソース・プロセッサを代用す​​ること自由に感じ、私は簡単のためGetFileを使用しています。

テキストファイル(外部プロセスによってそこに配置されている)でいっぱいのディレクトリがあるとします。各ファイルには "Firstname Lastname"という形式のテキストが含まれており、ファイル名を入力したタイムスタンプとともにLastname_YYYY-MM-DD-HH-mm-ss.txtという名前が付けられています。

GetFile -> ReplaceText -> PutFile 

GetFileプロセッサは、別々のflowfileとして各ファイルにもたらします。そこから、ReplaceTextは、正規表現を使って名前の順番を入れ替えるのと同じように簡単にできるし、PutFileは内容をファイルシステムに書き戻す。最初にGetFileがトリガーされると、nフローファイルが接続/キューReplaceTextにディスパッチされます。並列ではなくリニアに処理を待機させたい場合は、の成功をキューから1フローファイルに設定して、キューが空になるまで前のプロセッサ(GetFile)が実行されないようにします。

シナリオ2(複数flowfilesが一緒にグループ化され、連動して上の操作する必要があります)。ここ

あなたは、単一の一つに、複数のflowfilesを収集するためにMergeContentを使用したいと思います。空きしきい値がnのフローファイルに設定され、MergeContentプロセッサは、成功フローファイルのみが、着信するフローファイルの最小数に達したときに送信されます。属性でビンすることもできます。したがって、異種入力ソースから読み込んでいる場合でも、共通の機能に基づいて関連するデータを関連付けることができます。 Wait & Notify

代替シナリオに加えて、あなたはそれらの所望の目的地に「コンテンツ」flowfilesを「解放」に対応するWaitプロセッサにトリガflowfileを送信するためにNotifyプロセッサを使用することができます。繰り返しになりますが、上にリンクしているKojiの記事では、この流れとスクリーンショットの例を詳しく説明しています。

少なくともこれに従うことをお勧めします。これ以上コンテキストがなければ、ここではNiFi以外の問題を解決しようとしている感覚を得ることができますし、ストリーミングの精神をよりよくサポートするためにデータフローモデルを適合させることもできます。より多くの情報があれば、私は答えを広げてうれしいです。

+1

タイムリーな対応をとってくれてありがとう、ありがとうございます - 私はそれがどれだけ高く評価されているかを教えてくれません!あなたの答えは非常に明確で理解しやすいです。私が探しているのは、待っていると通知することだと思う - 私は間違いなくこれを指摘してくれてありがとう! – BigBug

関連する問題