私はちょうどあなたのコードに非常に近いいくつかのテストケースを書いてきました:
@Bean
public MessageSource<File> fileReadingMessageSource() {
CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
filters.addFilter(new SimplePatternFileListFilter("*.watch"));
FileReadingMessageSource fileSource = new FileReadingMessageSource();
fileSource.setDirectory(tmpDir.getRoot());
fileSource.setFilter(filters);
fileSource.setUseWatchService(true);
fileSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE,
FileReadingMessageSource.WatchEventType.MODIFY,
FileReadingMessageSource.WatchEventType.DELETE);
return fileSource;
}
@Bean
public IntegrationFlow readDirectoryFlow() {
return IntegrationFlows
.from(fileReadingMessageSource(),
e -> e.poller(p -> p.cron("*/1 * * * * *")))
.handle(System.out::println)
.get();
}
テストコードは次のようになります。
@ClassRule
public static final TemporaryFolder tmpDir = new TemporaryFolder();
@Test
public void testWatchServiceMessageSource() throws Exception {
File newFolder1 = tmpDir.newFolder();
FileOutputStream file = new FileOutputStream(new File(newFolder1, "foo.watch"));
file.write(("foo").getBytes());
file.flush();
file.close();
File newFolder2 = tmpDir.newFolder();
file = new FileOutputStream(new File(newFolder2, "bar.watch"));
file.write(("bar").getBytes());
file.flush();
file.close();
file = new FileOutputStream(new File(tmpDir.getRoot(), "root.watch"));
file.write(("root").getBytes());
file.flush();
file.close();
Thread.sleep(10000);
}
そして、私はこれがログに記録があります。
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit7776799219532481336\foo.watch, headers={id=50d44197-e0af-708a-6b61-2a2cfeec68da, timestamp=1473686655061}]
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit813088196038861528\bar.watch, headers={id=8d80c853-19b6-f667-7950-d6de49d509ab, timestamp=1473686656062}]
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\root.watch, headers={id=e585203b-41dc-cadb-6a36-4c9009a34701, timestamp=1473686657063}]
を毎秒
わからない問題がある...
あなたは.channel(fileInputChannel())
は必要ありません。 ednpointの間に自動的に作成されます。
設定と
:あなたはtailerRestart
が何かを返すことを確認する必要があります
.handle(tailerRestart)
.handle(System.out::println)
。 、当社の他の議論によると、それはありませんが:
@ServiceActivator
public void restartTailer(File input) throws Exception {
tailFileProducer.stop();
tailFileProducer.setFile(input);
tailFileProducer.start();
}
UPDATE
我々は問題が春のクラウドストリームインフラストラクチャによってFileReadingMessageSource.start()
と呼ばれる数回であることを考え出したいくつかの民間調査した後、内部のWatchService
オブジェクトを再インスタンス化します。
FileReadingMessageSource.start()
を冪等に固定する必要があります。https://github.com/spring-cloud/spring-cloud-stream/issues/525:https://jira.spring.io/browse/INT-4108
春クラウドストリームは、バージョン1.1
で修正されています。
回避策はFileReadingMessageSource.start()
は一度だけ呼び出されることを保証するようなものです:コードをテストするための
FileReadingMessageSource fileSource = new FileReadingMessageSource() {
private final AtomicBoolean running = new AtomicBoolean();
@Override
public void start() {
if (!this.running.getAndSet(true)) {
super.start();
}
}
@Override
public void stop() {
if (this.running.getAndSet(false)) {
super.stop();
}
}
};
感謝。ファイルを返すためにrestartTailerサービスアクティベータを変更しました。 WatchServiceDirectoryScannerは、FileReadingMessageSourceでQueue toBeReceivedを1回だけ設定していると感じます。ファイル/サブディレクトリが後で追加された場合、新しいサブディレクトリ内のファイルのメッセージは表示されません。WatchServiceDirectoryScannerに無計画に投票してJava 7 WatchServiceのすべてのイベントを監視していますか? –
私の答えで 'UPDATE'を見つけてください。 –