2017-01-21 14 views
2

Apache NiFiでは、FetchS3Objectを使用してS3バケットから読み取ると、バケット内のすべてのオブジェクトを読み取ることができ、追加されるとわかります。それは可能ですか?NiFiでは、FetchS3Objectプロセッサから選択的に読み込むことは可能ですか?

  1. すでに追加されているオブジェクトではなく、現在追加されているオブジェクトのみを読み取るようにプロセッサを構成しますか?
  2. バケット内の特定のフォルダを読み取らせるにはどうすればよいですか?

NiFiはよく知られているように、よく知られている多くのプロセッサのドキュメントにその例があります。

答えて

3

ListS3とFetchS3Objectプロセッサの組み合わせは、これを行います。

  1. ListS3 - あなたのS3バケットを列挙し、各オブジェクトを参照flowfilesを生成します。 接頭辞プロパティを設定して、バケット内の特定のフォルダを指定して、サブセットのみを列挙することができます。 ListS3は、NiFiの状態機能を使って読み込んだ内容を追跡しているので、新しいオブジェクトがバケットに追加されると、新しいフローファイルが生成されます。
  2. FetchS3Object - S3オブジェクトをフローファイルの内容に読み込む。 ListS3の出力を使用するには、FetchS3Objectのバケットプロパティを${s3.bucket}およびオブジェクトキープロパティを${filename}に設定します。

enter image description here

+0

ありがとうございました。私はすでにそれを成功させています。私の質問は、新しいファイルが追加されたときだけ読み込み、バケットの古いファイルは読み込めない特定のユースケースに特有のものです。 – Sammy

+0

ListS3は新しいオブジェクトを識別します。あなたはそれを実行して '今'まで読んで、既存のファイルの出力を破棄することができます。 – James

+0

ListS3 + FetchS3ObjectをRouteOnAttributeとともに使用しました。ここで、条件$ {s3.lastModified:ge(1485189600000)}を追加して、最近追加されたドキュメントのみをルーティングしました。 – Sammy

1

別のアプローチは、SQSキューをサブスクライブし、SNSの通知を送信するためにあなたのS3バケットを設定することです。 NiFiは、SQSキューから通知を受信し、対象となるオブジェクトをフィルタ処理して処理します。

このアプローチの詳細については、Monitoring An S3 Bucket in Apache NiFiを参照してください。

1

GetSQSプロセッサとfetchS3Objectプロセッサを使用し、GETSQSプロセッサが新しく追加されたファイルの通知を受信するように設定します。これは、新しいファイルが来るたびにSQSキューからnifiに通知を送信するようなイベント駆動型アプローチです。 完全な説明を得るには、以下のリンクを使用してください: AWS-NIFI integration

関連する問題