2016-09-11 5 views
2

Spring IntegrationからFileReadingMessageSourceのルートディレクトリを読み込んで、進行中のファイル作成を取得しています。シナリオでは、ルートディレクトリの下に複数のサブディレクトリが継続的に存在する可能性があります。 SI 4.3.1のWatchServiceDirectoryScannerを使用して、新しいサブディレクトリ内に作成されたファイルを取得します。最初のポーリングでWatchServiceDirectoryScannerが最初のポーリング後に作成された新しいファイルを取得しない

@Bean 
public MessageSource<File> fileReadingMessageSource() { 

    CompositeFileListFilter<File> filters = new CompositeFileListFilter<>(); 
    filters.addFilter(new SimplePatternFileListFilter("pattern*")); 
    //filters.addFilter(new LastModifiedFileListFilter()); 

    FileReadingMessageSource fileSource = new FileReadingMessageSource(); 

    String filePath = "root-directory"; 

    fileSource.setDirectory(new File(filePath)); 
    fileSource.setFilter(filters); 
    fileSource.setUseWatchService(true); 
    fileSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE,FileReadingMessageSource.WatchEventType.MODIFY,FileReadingMessageSource.WatchEventType.DELETE); 

    return fileSource; 
} 

@Bean 
public IntegrationFlow readDirectoryFlow() { 

    return IntegrationFlows.from(
      fileReadingMessageSource(), 
      e -> e.poller(Pollers.cron("*/5 * * * * *"))) 
      .channel(fileInputChannel()) 
      .handle(tailerRestart) 
      .handle(System.out::println) 
      .get(); 
} 

は、パターンに一致するすべてのファイルは、メッセージリソースを経由して利用可能ですが、新しいファイルは、新しいサブディレクトリの後半で作成した場合は、[メッセージリソースが新しいパターンに一致するファイルを選択することができません。

私はログ

に次のDEBUGメッセージ

DEBUG SourcePollingChannelAdapterを参照してください - は失われる可能性がどのような「偽の」

を返して、世論調査の際に何のメッセージを受信して​​いませんか?

答えて

1

私はちょうどあなたのコードに非常に近いいくつかのテストケースを書いてきました:

@Bean 
    public MessageSource<File> fileReadingMessageSource() { 
     CompositeFileListFilter<File> filters = new CompositeFileListFilter<>(); 
     filters.addFilter(new SimplePatternFileListFilter("*.watch")); 

     FileReadingMessageSource fileSource = new FileReadingMessageSource(); 
     fileSource.setDirectory(tmpDir.getRoot()); 
     fileSource.setFilter(filters); 
     fileSource.setUseWatchService(true); 
     fileSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE, 
       FileReadingMessageSource.WatchEventType.MODIFY, 
       FileReadingMessageSource.WatchEventType.DELETE); 
     return fileSource; 
    } 

    @Bean 
    public IntegrationFlow readDirectoryFlow() { 
     return IntegrationFlows 
       .from(fileReadingMessageSource(), 
         e -> e.poller(p -> p.cron("*/1 * * * * *"))) 
       .handle(System.out::println) 
       .get(); 
    } 

テストコードは次のようになります。

@ClassRule 
public static final TemporaryFolder tmpDir = new TemporaryFolder(); 

@Test 
public void testWatchServiceMessageSource() throws Exception { 
    File newFolder1 = tmpDir.newFolder(); 
    FileOutputStream file = new FileOutputStream(new File(newFolder1, "foo.watch")); 
    file.write(("foo").getBytes()); 
    file.flush(); 
    file.close(); 

    File newFolder2 = tmpDir.newFolder(); 
    file = new FileOutputStream(new File(newFolder2, "bar.watch")); 
    file.write(("bar").getBytes()); 
    file.flush(); 
    file.close(); 

    file = new FileOutputStream(new File(tmpDir.getRoot(), "root.watch")); 
    file.write(("root").getBytes()); 
    file.flush(); 
    file.close(); 

    Thread.sleep(10000); 
} 

そして、私はこれがログに記録があります。

GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit7776799219532481336\foo.watch, headers={id=50d44197-e0af-708a-6b61-2a2cfeec68da, timestamp=1473686655061}] 
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit813088196038861528\bar.watch, headers={id=8d80c853-19b6-f667-7950-d6de49d509ab, timestamp=1473686656062}] 
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\root.watch, headers={id=e585203b-41dc-cadb-6a36-4c9009a34701, timestamp=1473686657063}] 
を毎秒

わからない問題がある...

あなたは.channel(fileInputChannel())は必要ありません。 ednpointの間に自動的に作成されます。

設定と

:あなたはtailerRestartが何かを返すことを確認する必要があります

.handle(tailerRestart) 
.handle(System.out::println) 

。 、当社の他の議論によると、それはありませんが:

@ServiceActivator 
public void restartTailer(File input) throws Exception { 
    tailFileProducer.stop(); 
    tailFileProducer.setFile(input); 
    tailFileProducer.start(); 
} 

UPDATE

我々は問題が春のクラウドストリームインフラストラクチャによってFileReadingMessageSource.start()と呼ばれる数回であることを考え出したいくつかの民間調査した後、内部のWatchServiceオブジェクトを再インスタンス化します。

FileReadingMessageSource.start()を冪等に固定する必要があります。https://github.com/spring-cloud/spring-cloud-stream/issues/525https://jira.spring.io/browse/INT-4108

春クラウドストリームは、バージョン1.1で修正されています。

回避策はFileReadingMessageSource.start()は一度だけ呼び出されることを保証するようなものです:コードをテストするための

FileReadingMessageSource fileSource = new FileReadingMessageSource() { 

    private final AtomicBoolean running = new AtomicBoolean(); 

    @Override 
    public void start() { 
     if (!this.running.getAndSet(true)) { 
     super.start(); 
     } 
    } 

    @Override 
    public void stop() { 
     if (this.running.getAndSet(false)) { 
     super.stop(); 
     } 
    } 

}; 
+0

感謝。ファイルを返すためにrestartTailerサービスアクティベータを変更しました。 WatchServiceDirectoryScannerは、FileReadingMessageSourceでQueue toBeReceivedを1回だけ設定していると感じます。ファイル/サブディレクトリが後で追加された場合、新しいサブディレクトリ内のファイルのメッセージは表示されません。WatchServiceDirectoryScannerに無計画に投票してJava 7 WatchServiceのすべてのイベントを監視していますか? –

+1

私の答えで 'UPDATE'を見つけてください。 –

関連する問題