ここでは多くのことが起こっているので、私はこの回答をいくつかの部分に分けるつもりです。
属性を持つ空のフローファイルをNiFiのフローファイルに送信する方法があるのでしょうか?私はこれをトリガーとして使用して、 というイベントの種類が「開始済み」であることを示したいと思います。
プロセッサGenerateFlowFile
プロセッサでは、空の(または入力された)フローファイルを定期的な実行スケジュールまたはCRONスケジューリングで送信できます。これをプロセッサーUpdateAttribute
と組み合わせて、任意の静的または動的属性をフローファイルに追加することができます。
NiFiでは、イベントセット が開始され、終了したことを示す他の方法はありますか?たとえば、私は3つのプロセッサ がデータを読み込んでいて、最初のプロセッサが起動しようとしていて、最後のプロセッサが終了していることを知りたいと思っています。とにかくこれを行うために私には ですか?
これは、Apache NiFiが設計や最適化されていないバッチ処理に近づいています。 のソースプロセッサを決定するには、「起動しようとしています」が非常に困難です。そのプロセッサがタイマー/ CRONベースで起動されている場合は、そのタイミングを知ることができますが、「GetFile
がファイルを正常に取得しようとしている」という意味ならば、それは簡単に実行できません。独自のカスタマープロセッサーでプロセッサーを拡張し、onTrigger()
メソッドをオーバーライドして、DistributedMapCacheClientService
に別のプロセッサーが対応できる値を保管することができます。あるいは、ロジックをExecuteScript
プロセッサでラップし、カスタム通知コードを書くことができます。私はターゲットここに - はこの状態の変化を通知されますかわからないですか?それは別のプロセッサ、人間オブザーバー、または外部サービスですか?
プロセッサが稼働し続ける場合は、 を1回の処理でプロセッサ1からプロセッサ3に読み取ったデータをグループ化することができます。 このより明確
Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2 Processor3 End ...
を作るためにしかし、私が何を求めていることは、新たなWait
とNotify
プロセッサの使用で可能であると考えています。川村浩司は、その使用方法を記述した良い記事を書いていますhere。
このケースでは、一度に1つのデータ単位でない限り、システムを経由してくるバッチを検出できるように特別なコンテンツまたは属性が必要だと思います。私は以下の2つのシナリオを説明しようとしますが、私はこれに関する多くのコンテキストを持っていません。
シナリオ1(データのシングルユニット)
異なるソース・プロセッサを代用すること自由に感じ、私は簡単のためGetFile
を使用しています。
テキストファイル(外部プロセスによってそこに配置されている)でいっぱいのディレクトリがあるとします。各ファイルには "Firstname Lastname"という形式のテキストが含まれており、ファイル名を入力したタイムスタンプとともにLastname_YYYY-MM-DD-HH-mm-ss.txt
という名前が付けられています。
GetFile -> ReplaceText -> PutFile
GetFile
プロセッサは、別々のflowfileとして各ファイルにもたらします。そこから、ReplaceText
は、正規表現を使って名前の順番を入れ替えるのと同じように簡単にできるし、PutFile
は内容をファイルシステムに書き戻す。最初にGetFile
がトリガーされると、nフローファイルが接続/キューReplaceText
にディスパッチされます。並列ではなくリニアに処理を待機させたい場合は、の成功をキューから1
フローファイルに設定して、キューが空になるまで前のプロセッサ(GetFile
)が実行されないようにします。
シナリオ2(複数flowfilesが一緒にグループ化され、連動して上の操作する必要があります)。ここ
あなたは、単一の一つに、複数のflowfilesを収集するためにMergeContent
を使用したいと思います。空きしきい値がnのフローファイルに設定され、MergeContent
プロセッサは、成功フローファイルのみが、着信するフローファイルの最小数に達したときに送信されます。属性でビンすることもできます。したがって、異種入力ソースから読み込んでいる場合でも、共通の機能に基づいて関連するデータを関連付けることができます。 Wait
& Notify
と
代替シナリオに加えて、あなたはそれらの所望の目的地に「コンテンツ」flowfilesを「解放」に対応するWait
プロセッサにトリガflowfileを送信するためにNotify
プロセッサを使用することができます。繰り返しになりますが、上にリンクしているKojiの記事では、この流れとスクリーンショットの例を詳しく説明しています。
少なくともこれに従うことをお勧めします。これ以上コンテキストがなければ、ここではNiFi以外の問題を解決しようとしている感覚を得ることができますし、ストリーミングの精神をよりよくサポートするためにデータフローモデルを適合させることもできます。より多くの情報があれば、私は答えを広げてうれしいです。
タイムリーな対応をとってくれてありがとう、ありがとうございます - 私はそれがどれだけ高く評価されているかを教えてくれません!あなたの答えは非常に明確で理解しやすいです。私が探しているのは、待っていると通知することだと思う - 私は間違いなくこれを指摘してくれてありがとう! – BigBug