2016-12-08 17 views
1

Spring Integrationを使用してOracleデータベースに特定のステータスのレコードを照会する、小さなSpring起動アプリケーションがあります。JpaPollingChannelAdapterには、JdbcPollingChannelAdapterのUpdateSqlと同様の機能がありますか?

最初にJdbcPollingChannelAdapterを使用して作業POCを作成しました。これは、見つかったレコードのステータスを変更して再スキャンされないようにする更新ステートメントも定義しました。

このアプリケーションではHibernateを使用しているため、Jdbcの実装をJPAのアプローチに置き換えたいと考えていました。

このように、JpaExecutorを使用してJpaPollingChannelAdapterを実装し、結果をエンティティ形式で取得することに成功しました。

私が達成しようとしているのは、ポーリングと同じトランザクションで見つかったすべてのレコードを更新するJdbcのアプローチと同様の動作です。

アダプター内でこれを達成する適切な方法はありますか、または単にメッセージハンドラー内でエンティティdaoを使用する必要がありますか?

うまくいけばうまくいきます。

更新

私は(それが唯一の検索のためだからInboundChannelAdapterに対して)私はOutboundChannelAdapterまたはゲートウェイを使用する必要がありますと仮定していthrough the docsを探しています。

私は、1つのハンドラ内ですべてを行うことができれば、または複数のチャネルを定義する必要がある場合は、適切に配線する方法になります。取得するには1つを選択し、エンティティのステータスを更新します。 JpaExecutordeleteAfterPollオプションを持っているJpaPollingChannelAdapter機能については

@Configuration 
public class IntegrationConfiguration { 

    private final Log log = LogFactory.getLog(getClass()); 

    @Autowired 
    private DataSource dataSource; 

    @Autowired 
    private EntityManager entityManager; 

    @Autowired 
    private Reactor rootReactor; 

    @Autowired 
    private RunDao runDao; 

    @Bean 
    @InboundChannelAdapter(channel = "notificationChannel", poller = @Poller(fixedDelay = "60000", maxMessagesPerPoll = "-1")) 
    public MessageSource<?> jpaMessageSource() { 
     return new JpaPollingChannelAdapter(jpaSelectExecutor()); 
    } 

    @Bean 
    @Gateway(requestChannel = "notificationChannel") 
    public void updateMessageStatus() { 
     JpaOutboundGateway gateway = new JpaOutboundGateway(jpaUpdateExecutor()); 
     gateway.setGatewayType(OutboundGatewayType.UPDATING); 
    } 

    @Bean 
    public JpaExecutor jpaSelectExecutor() { 
     JpaExecutor executor = new JpaExecutor(this.entityManager); 
     executor.setJpaQuery("select R from Run R where R.notificationStatus = 'NOT_SENT' and R.runStatus.status = 'COMPLETE' and R.runConfig.notificationRecipients is not null"); 
     executor.setEntityClass(Run.class); 
     return executor; 
    } 

    @Bean 
    public JpaExecutor jpaUpdateExecutor() { 
     JpaExecutor executor = new JpaExecutor(this.entityManager); 
     executor.setJpaQuery("update Run R set R.notificationStatus = 'SENDING' where R.RunId = :RunId"); 
     executor.setEntityClass(Run.class); 
     return executor; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "notificationChannel") 
    public MessageHandler jpaMessageHandler() { 
     MessageHandler handler = new MessageHandler() { 
      @Override 
      public void handleMessage(Message<?> message) throws MessagingException { 
       for (Run run : (ArrayList<Run>) message.getPayload()) { 
        rootReactor.notify("send-email-notification", Event.wrap(run)); 
        //run.setNotificationStatus(Run.NotificationStatus.SENDING); 
        //runDao.merge(run); 
       } 
      } 
     }; 

     return handler; 
    } 

    @Bean 
    public IntegrationFlow pollingAdapterFlow() { 
     return IntegrationFlows 
       .from(jpaMessageSource()) 
       .handle(jpaMessageHandler()) 
       .get(); 
    } 
} 

答えて

0

:ここ

は、いくつかの基本的なコードです。

エンティティクラスのHibernate @SQLDeleteを使用して、デフォルトのDELETEをカスタムUPDATEに変更することができます。

または、同じポーリングトランザクションでmerge操作の場合は一方向にJpaOutboundGatewayを使用できます。しかしその場合は、fixedDelayでなければならず、タスクが重ならないようにfixedRateでなく、パラレルスレッドで同じデータをポーリングしないでください。

+0

ありがとうございましたArtem、私はfixedDelayを使ってJpaOutboundGatewayマージアプローチを調べます。 – rcurrie

+0

ちょうどフォローアップの質問。私はJpaOutboundGatewayを使用する2番目の提案を実装しようとしています。上記のコードのJpaPollingChannelAdapterの定義をJpaOutboundGatewayアダプタに置き換える必要がありますか?私は、レコードを取得するための選択クエリと、エンティティのステータスを更新するステップの両方を定義する方法に苦労しています。 – rcurrie

+0

いいえ、あなたはしないでください。 SELECTには 'JpaPollingChannelAdapter'を使い、UPDATEには' JpaOutboundGateway'を使います。それは明確ですか? –

関連する問題