2017-10-29 13 views
1

LuceneインデクサLuwakを使用してカスタムプロセッサを作成しようとしていますので、受信フローファイルに対してクエリを実行できます。私が把握しようとしているのは、Luwakモニタ(以下のサンプルコード)の内部に存在するクエリインデックスを更新する最良の方法です。NiFi - プロセッサでLuwak(Lucene)インデックスを更新する

EDIT - その他の使用コンテキスト

更新することで、私が入ってくるflowfilesに対して実行されるクエリを削除/ /更新を追加するために外部のユーザーを許可する意味。固定されたクエリのセットから始めますが、ユーザー(複数可)が受信メッセージに対して実行されているクエリを変更できるようにする必要があります。ここでは、実行されているクエリを変更するという課題があります。

他のオプションを検討する必要がありますか?クエリーを更新するには、そのうちの10k個があれば約20秒かかると思われます。これはおそらく稀ですが、再ロード/起動時間は私が検討しようとしているものです。

オプション私は考えた:

  1. はUpdateAttributeを使用し、すべてのflowfileに更新されます。理想的ではありません。特に、インデックスを作成するクエリがたくさんある場合
  2. http、AWS SQSなどを使用して、更新する優先度の高いフローファイルを送信します(他のどのソースよりも上位にする)。恐ろしいことではありませんが、それでも正しいとは思われません。
  3. NiFi APIを使用して、アップデート時にプロセッサを起動/停止します。更新が非常に効率的に行われるようには見えません。

をインスタンスモニター:

Monitor monitor = new Monitor(new LuceneQueryParser("field"), new TermFilteredPresearcher()); 

クエリを追加 - 私は最適化しようとしているもの:

 //Add queries to the monitor 
     for (Map.Entry<String, String> entry : bucketList.entrySet()) { 
      MonitorQuery q = new MonitorQuery(entry.getKey(),entry.getValue()); 
      monitor.update(q); 
     } 
+0

(IMHO)問題は明確ではありません。 'UpdateAttribute'、' AWS SQS'、 'NiFi API for start/stop'はどのように関連していますか? – daggett

+0

プロセッサを起動して修正するときにクエリを読み込まないでください。変化していることは、インデックス付きクエリと照合される着信フローファイルです。 –

+0

@daggett Bryan Bende:私は、より多くの文脈を提供するEDITを追加しました。私は、実行時に適用されているクエリをユーザーが変更できるようにしたいと思っています。これは私が挑戦しているところです。彼らは無期限に固定されないであろう。 – scarpacci

答えて

1

あなたのプロセッサが起動すると、あなたが定期的にバックグラウンドタイマースレッドを開始することができます新しいモニターを作成し、プロセッサーが使用しているモニターを置き換えます。

あなたはおそらく、あなたのようなプロセッサでのメンバ変数を作りたいでしょう:次に@OnScheduledに、あなたが最初のモニターを構築することができますし、ホルダーにそれを設定し

AtomicReference<Monitor> monitorHolder = new AtomicReference<Monitor>(); 

。プロセッサの現在の実行には影響を与えませんが、かかります、あなたがmonitorHolder.set(newMonitor)を呼び出すことができますバックグラウンドスレッドで次に

Monitor localMonitor = monitorHolder.get(); 

はその後onTriggerに、あなたは常に最初のモニタを取得しますonTriggerが呼び出された次回のときに有効になります。

+0

Ahhh。素晴らしいアイデアブライアン!ありがとう。私はそれを試みます。 – scarpacci

関連する問題