2017-08-17 13 views
0

現在、apache camelを使用してSQSメッセージを消費していますが、すべて正常に動作しています。Apache Camel - 本文に基づいてメッセージを集約します。

1つのプロジェクトの一部として、ファイルをアップロードするときにS3通知イベントを消費します。ファイルは、予測可能なS3キー((<type>/<account-id>/<id>/<file>))にアップロードされます。

処理時に、メッセージを1つのエクスチェンジに集約するためにラクダを使用しています(10個のメッセージを待つか、1秒後にタイムアウトする)。私は不思議に思っていましたが、S3に基づいて集約する方法があります - たとえば、同じタイプまたはIDを持つ集約メッセージ。

私はラクダのドキュメントを読んで理解しているので、Jsonペイロードやヘッダー値を照会する方法があります。これは可能なアプローチですか(S3イベント通知はJsonメッセージであり、AWSドキュメントではPUT操作単一のレコードエントリーを生成するだけでしょうか?)または、私自身のAggregatorを実装する必要がありますか?

コンテキストを少し追加する - データを収集してS3にアップロードするサービスがあります。別のサービスは通知されるとこのデータをダウンロードし、処理して別のバケットにアップロードします。 S3通知を集約できれば、データを結合してアップロードすることができます。アップロードとAPI呼び出しの量を減らすことができます。

答えて

1

camel-aws s3コンポーネントを使用すると、S3キーにアクセスできますメッセージのCamelAwsS3Keyヘッダーであり、本文を照会する必要はありませんが、S3キーから必要なフィールドを抽出する必要があります。

+0

ええ、私はs3コンポーネントをチェックアウトしました。処理したファイルを削除することはできませんでした(他のアプリケーションでも必要かもしれません)。ポーリング後にどのファイルがすでに処理されていたかを把握する必要があります。 – KingTravisG

+0

このコンポーネントが処理されたファイルの削除を無効にするためのパラメータ: 'deleteAfterRead = false'。 「どのファイルがすでに処理されたかを把握する必要がある」とはどういう意味ですか?重複を除外することを意味するならば、 'CamelAwsS3Key'をキーとして[冪等リポジトリ](http://camel.apache.org/idempotent-consumer.html)を使うことができます。 – mgyongyosi

+0

ええ、ファイルはバックアップのために保存され、他のアプリケーションが処理するようになっています - リポジトリはソリューションのように見えますが、追加のインフラストラクチャなどを避けようとしており、時間の経過とともにパフォーマンスが心配されます。ファイルを保存する必要がなければ、これはおそらく最良の解決策になるでしょう(私は既にSQSのためにラクダを使用しているので、いくつかの経験もありますが、それはボーナスです)。 – KingTravisG

0

最高または最も一般的なソリューションではありませんが、私はこの作業を取得する方法を見つけた -

私は単にアグリゲータに渡す前に呼び出され、追加のプロセッサを追加しました。プロセッサーは、S3キーのイベント記録(S3からのPUTイベントをリスンしています(AWSドキュメントによると1つのレコードのみにする必要があるため)をチェックし、メッセージにヘッダーを設定します。

アグリゲータは、これらのヘッダ(単にS3型S3-アカウント-ID、およびS3-ID)に基づいて交流を結合することができます。

関連する問題