2017-02-22 1 views
0

私は通常、cronトリガーを使ってSFTP経由で毎日ファイルをポーリングするSpring統合アプリケーションを持っています。しかし、期待されるファイルが見つからない場合は、y回まで定期的なトリガーでx分ごとにポーリングする必要があります。これを行うには、次のコンポーネントを使用します。複合トリガでオーバーライドを設定するにはどうすればよいですか?

@Component 
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice { 

    private final static Logger logger = LoggerFactory.getLogger(RetryCompoundTriggerAdvice.class); 

    private final CompoundTrigger compoundTrigger; 

    private final Trigger override; 

    private final ApplicationProperties applicationProperties; 

    private final Mail mail; 

    private int attempts = 0; 

    public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, 
      @Qualifier("secondaryTrigger") Trigger override, 
      ApplicationProperties applicationProperties, 
      Mail mail) { 
     this.compoundTrigger = compoundTrigger; 
     this.override = override; 
     this.applicationProperties = applicationProperties; 
     this.mail = mail; 
    } 

    @Override 
    public boolean beforeReceive(MessageSource<?> source) { 
     return true; 
    } 

    @Override 
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) { 
     final int maxOverrideAttempts = applicationProperties.getMaxFileRetry(); 
     attempts++; 
     if (result == null && attempts < maxOverrideAttempts) { 
      logger.info("Unable to find load file after " + attempts + " attempt(s). Will reattempt"); 
      this.compoundTrigger.setOverride(this.override); 
     } else if (result == null && attempts >= maxOverrideAttempts) { 
      mail.sendAdminsEmail("Missing File"); 
      attempts = 0; 
      this.compoundTrigger.setOverride(null); 
     } 
     else { 
      attempts = 0; 
      this.compoundTrigger.setOverride(null); 
      logger.info("Found load file"); 
     } 
     return result; 
    } 

    public void setOverrideTrigger() { 
     this.compoundTrigger.setOverride(this.override); 
    } 

    public CompoundTrigger getCompoundTrigger() { 
     return compoundTrigger; 
    } 
} 

ファイルが存在しない場合、これは素晴らしい動作です。すなわち、オーバーライド(すなわち、周期的トリガ)が有効になり、y回の試行までx分毎にポーリングされる。

しかし、ファイルは存在しますが、期待されるファイルではない場合(データの日付が間違っているなど)、別のクラス(ファイルを読み取る)がクラスのsetOverrideTriggerを呼び出します。しかし、afterReceiveはその後x分ごとに呼び出されません。これはなぜでしょうか?

SftpInboundFileSynchronizer

はここで、アプリケーション・コードの多くはで

@Bean 
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { 
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); 
    fileSynchronizer.setDeleteRemoteFiles(false); 
    fileSynchronizer.setRemoteDirectory(applicationProperties.getSftpDirectory()); 
    CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<ChannelSftp.LsEntry>(); 
    compositeFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(store, "sftp")); 
    compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter(applicationProperties.getLoadFileNamePattern())); 
    fileSynchronizer.setFilter(compositeFileListFilter); 
    fileSynchronizer.setPreserveTimestamp(true); 
    return fileSynchronizer; 
} 

セッションファクトリです:

@Bean 
public SessionFactory<LsEntry> sftpSessionFactory() { 
    DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory(); 
    sftpSessionFactory.setHost(applicationProperties.getSftpHost()); 
    sftpSessionFactory.setPort(applicationProperties.getSftpPort()); 
    sftpSessionFactory.setUser(applicationProperties.getSftpUser()); 
    sftpSessionFactory.setPassword(applicationProperties.getSftpPassword()); 
    sftpSessionFactory.setAllowUnknownKeys(true); 
    return new CachingSessionFactory<LsEntry>(sftpSessionFactory); 
} 

SftpInboundFileSynchronizingMessageSourceが複合トリガーを使用してポーリングするように設定されています。春のバッチジョブオフ

@Bean 
@ServiceActivator(inputChannel = "sftpChannel") 
public MessageHandler dailyHandler(SimpleJobLauncher jobLauncher, Job job, Mail mail) { 
    JobRunner jobRunner = new JobRunner(jobLauncher, job, store, mail); 
    jobRunner.setDaily("true"); 
    jobRunner.setOverwrite("false"); 
    return jobRunner; 
} 

JobRunnerキック:

@Bean 
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata")) 
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() { 
    SftpInboundFileSynchronizingMessageSource source = 
      new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); 
    source.setLocalDirectory(applicationProperties.getScheduledLoadDirectory()); 
    source.setAutoCreateLocalDirectory(true); 
    CompositeFileListFilter<File> compositeFileFilter = new CompositeFileListFilter<File>(); 
    compositeFileFilter.addFilter(new LastModifiedFileListFilter()); 
    compositeFileFilter.addFilter(new FileSystemPersistentAcceptOnceFileListFilter(store, "dailyfilesystem")); 
    source.setLocalFilter(compositeFileFilter); 
    source.setCountsEnabled(true); 
    return source; 
} 

@Bean 
public PollerMetadata pollerMetadata(RetryCompoundTriggerAdvice retryCompoundTriggerAdvice) { 
    PollerMetadata pollerMetadata = new PollerMetadata(); 
    List<Advice> adviceChain = new ArrayList<Advice>(); 
    adviceChain.add(retryCompoundTriggerAdvice); 
    pollerMetadata.setAdviceChain(adviceChain); 
    pollerMetadata.setTrigger(compoundTrigger()); 
    pollerMetadata.setMaxMessagesPerPoll(1); 
    return pollerMetadata; 
} 

@Bean 
public CompoundTrigger compoundTrigger() { 
    CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger()); 
    return compoundTrigger; 
} 

@Bean 
public CronTrigger primaryTrigger() { 
    return new CronTrigger(applicationProperties.getSchedule()); 
} 

@Bean 
public PeriodicTrigger secondaryTrigger() { 
    return new PeriodicTrigger(applicationProperties.getRetryInterval()); 
} 

アップデートは

ここでメッセージハンドラです。ジョブを処理した後、私のアプリケーションは、ファイルがその日の予定データを持っているかどうかを調べます。そうでない場合は、オーバーライドトリガを設定しています。

答えて

1

トリガーが動作する方法です。トリガーが起動すると、トリガーを変更する機会が得られます。

cronトリガーにリセットしたので、変更の次の機会は、トリガーが起動したときです(トリガーを変更する前に下流のフローによってポーラースレッドが解放された場合)。

ファイルを別のスレッド(キューチャネルまたはエグゼキュータ)に渡していますか?そうでなければ、nextExecutionTime()はダウンストリームフローが返されるまで呼び出されないため、トリガーの変更を適用する必要があります。

スレッドハンドオフがある場合、トリガーを変更する機会はありません。

+0

ありがとうございました。うん、それは私が知ることとは異なる糸です。それで、それはそれを説明します。 OP更新セクションに関連コードを追加しました。 'RetryCompoundTriggerAdvice'内から再試行することを決定するロジックを呼び出さずに、私の目標をどのように達成するかはわかりません。しかし、ロジックは、ファイル内のすべての行の読み込みと比較を含むいくつかの要因に基づいています(これが、バッチバッチジョブの完了後に適用される理由です)。 – James

+1

'CompoundTriggerAdvice'は、[スマートポーラー](http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#__smart_polling)の機能を提供します。 1つの可能性は、メッセージが処理されている間に、ファイル状態が評価されるまで短期ポーラーを維持することです。サービスは、すべてがうまくいるとのアドバイスを "確認"し、次の投票でトリガーを変更させます。ファイルが処理されている間(そして確認の前に)、 'beforeReceive()'が 'false'を返して現在のポーリングをキャンセルします –

関連する問題