0

私は、組織のCloud Foundryデプロイメントで実行されるデータ取り込みソリューションを作成するために、多くのSpring Cloud DataFlowと関連ドキュメントを読んできました。目標は、議論のために、おそらく1日3回、データのHTTPサービスをポーリングし、そのデータをPostgreSQLデータベースに挿入/更新することです。 HTTPサービスは、1日あたり10万件のレコードを提供しているようです。Spring Cloud DataFlow httpポーリングと重複排除

これまでの混乱のポイントは、ポーリングされたレコードを重複除外するためのDataFlowパイプラインのコンテキストでのベストプラクティスです。ソースデータには、ポーリングを追跡するのに役立つタイムスタンプフィールドはなく、粗い日レベルの日付フィールドのみがあります。また、遡及的に記録が更新されないという保証もありません。レコードは一意のIDを持つように見えるので、レコードをそのようにデッドアップすることができますが、DataFlowでそのロジックを実装するにはどうすればよいかをドキュメントに基づいて確信できません。私が知る限りでは、Spring Cloud Stream startersは、このアウトオブボックスを提供していません。私はSpring Integrationのsmart pollingについて読んでいましたが、それは私の懸念にも対処するためのものではないと思います。

私の直感は、ポーリングされたレコードがすでに挿入されているかどうかを判断するためにデータベースクエリを実行し、次に適切なレコードをターゲットデータベースに挿入するか、ストリームの下にそれらを渡すDataFlow StreamでカスタムプロセッサJavaコンポーネントを作成することです。 Streamアプリケーションで受け入れ可能な中間ステップでターゲットデータベースをクエリしていますか?また、Spring Cloud Taskにバッチ処理として実装することもできます。

DataFlowアプリケーションを進める最も良い方法は何ですか? DataFlow/Stream/Task/Integrationアプリケーションで説明した重複排除を実現するための共通/ベストプラクティスは何ですか?私はかなりのカスタムコードを書く必要があるので、私はスターターアプリの設定をコピーするか、ゼロから始めればよいのでしょうか? Spring Cloud DataFlowが必要なのでしょうか?DSLをまったく使用しているかどうかわからないからですか?すべての質問は申し訳ありませんが、Cloud FoundryやこれらのSpringプロジェクトには新しくなっているので、すべてをまとめていくのは難しいことです。

ご協力いただきありがとうございます。

答えて

2

要件を満たせば、カスタムプロセッサを作成する必要があります。重複を避けるために挿入されたものを追跡する必要があります。

このようなプロセッサをストリームアプリケーションに書き込むことは何も妨げられませんが、レコードごとにDBクエリを発行するため、パフォーマンスが低下する可能性があります。

注文が重要でない場合は、複数の並行メッセージを処理できるようにクエリを並列化できますが、結局DBは引き続き料金を支払うことになります。

もう1つの方法は、挿入されたレコードのチェックを高速化するのにかなり役立つbloomfilterを使用することです。

開始アプリケーションのクローンを作成することから始めます。ポーラートリガーでデータを取得し、カスタムコードプロセッサーを経由して最終的にjdbc-sinkに行くHTTPクライアントプロセッサーを起動できます。 SCDFを使用する利点のstream create time --triger.cron=<CRON_EXPRESSION> | httpclient --httpclient.url-expression=<remote_endpoint> | customProcessor | jdbc

一つのようなものは、あなたが独立して、このようなdeployer.customProcessor.count=8などのデプロイメント・プロパティを経由して、カスタムプロセッサをスケールすることができることを

+0

私の答えを見る –

1

春クラウドデータフローは、春のクラウドストリームに基づいて、データの統合ストリームを構築していますまた、Spring Integrationに完全に基づいています。また、Spring Integrationのすべての原則は、SCDFレベルのどこにでも適用できます。

実際にはコーディングを避けることはできませんが、必要なのはEIP Idempotent Receiverです。そしてSpring Integrationは私たちのために1つを提供します:

@ServiceActivator(inputChannel = "processChannel") 
    @IdempotentReceiver("idempotentReceiverInterceptor") 
    public void handle(Message<?> message) 
関連する問題