0

私はSpringクラウドのデータフローでSFTP Sourceを使用していますが、sftp:remote-dir:/ home/someone/sourceにファイルを定義するために働いています。リモートディレクトリの下にある多くのサブフォルダと、このディレクトリの下にあるpattenと一致するすべてのファイルを再帰的に取得します。私はfilename-regex:を使用しようとしていますが、これまでは1つのレベルでしか動作しません。どのように再帰的に必要なファイルを取得するのですか?Spring Cloud App Starter、sftp source、ファイルのディレクトリを再帰

答えて

2

受信チャネルアダプタは再帰をサポートしません。再帰(-R)を使用して、outbound gatewayのカスタムソースをMGETコマンドで使用します。

docにはこのオプションがありません。 current docsで修正されました。

私はan issueを開いて標準のアプリスターターを作成しました。私は、[リンクに基づいて、それをモデル化しようとしています

JavaのDSLで編集

...

@SpringBootApplication 
@EnableBinding(Source.class) 
public class So44710754Application { 

    public static void main(String[] args) { 
     SpringApplication.run(So44710754Application.class, args); 
    } 

    // should store in Redis or similar for persistence 
    private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>(); 

    @Bean 
    public IntegrationFlow flow() { 
     return IntegrationFlows.from(source(), e -> e.poller(Pollers.fixedDelay(30_000))) 
       .handle(gateway()) 
         .split() 
         .<File>filter(p -> this.processed.putIfAbsent(p.getAbsolutePath(), true) == null) 
         .transform(Transformers.fileToByteArray()) 
         .channel(Source.OUTPUT) 
         .get(); 
    } 

    private MessageSource<String> source() { 
     return() -> new GenericMessage<>("foo/*"); 
    } 

    private AbstractRemoteFileOutboundGateway<LsEntry> gateway() { 
     AbstractRemoteFileOutboundGateway<LsEntry> gateway = Sftp.outboundGateway(sessionFactory(), "mget", "payload") 
       .localDirectory(new File("/tmp/foo")) 
       .options(Option.RECURSIVE) 
       .get(); 
     gateway.setFileExistsMode(FileExistsMode.IGNORE); 
     return gateway; 
    } 

    private SessionFactory<LsEntry> sessionFactory() { 
     DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory(); 
     sf.setHost("10.0.0.3"); 
     sf.setUser("ftptest"); 
     sf.setPassword("ftptest"); 
     sf.setAllowUnknownKeys(true); 
     return new CachingSessionFactory<>(sf); 
    } 

} 

とJavaの設定...

@SpringBootApplication 
@EnableBinding(Source.class) 
public class So44710754Application { 

    public static void main(String[] args) { 
     SpringApplication.run(So44710754Application.class, args); 
    } 

    @InboundChannelAdapter(channel = "sftpGate", poller = @Poller(fixedDelay = "30000")) 
    public String remoteDir() { 
     return "foo/*"; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "sftpGate") 
    public SftpOutboundGateway mgetGate() { 
     SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sessionFactory(), "mget", "payload"); 
     sftpOutboundGateway.setOutputChannelName("splitterChannel"); 
     sftpOutboundGateway.setFileExistsMode(FileExistsMode.IGNORE); 
     sftpOutboundGateway.setLocalDirectory(new File("/tmp/foo")); 
     sftpOutboundGateway.setOptions("-R"); 
     return sftpOutboundGateway; 
    } 

    @Bean 
    @Splitter(inputChannel = "splitterChannel") 
    public DefaultMessageSplitter splitter() { 
     DefaultMessageSplitter splitter = new DefaultMessageSplitter(); 
     splitter.setOutputChannelName("filterChannel"); 
     return splitter; 
    } 

    // should store in Redis, Zookeeper, or similar for persistence 
    private final ConcurrentMap<String, Boolean> processed = new ConcurrentHashMap<>(); 

    @Filter(inputChannel = "filterChannel", outputChannel = "toBytesChannel") 
    public boolean filter(File payload) { 
     return this.processed.putIfAbsent(payload.getAbsolutePath(), true) == null; 
    } 

    @Bean 
    @Transformer(inputChannel = "toBytesChannel", outputChannel = Source.OUTPUT) 
    public FileToByteArrayTransformer toBytes() { 
     FileToByteArrayTransformer transformer = new FileToByteArrayTransformer(); 
     return transformer; 
    } 

    private SessionFactory<LsEntry> sessionFactory() { 
     DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory(); 
     sf.setHost("10.0.0.3"); 
     sf.setUser("ftptest"); 
     sf.setPassword("ftptest"); 
     sf.setAllowUnknownKeys(true); 
     return new CachingSessionFactory<>(sf); 
    } 

} 
+0

と](https://github.com/spring-cloud/spring-cloud-stream-app-starters/blob/master/sftp/spring-cloud-starter-stream-source-sftp/src/main/java/org/ springframework/cloud/stream/app/sftp/source/SftpSourceConfigur ation.java)、「コンシューマ()」の設定に問題があります。 sftpSourceのように動作するように、どのように構造化すればよいか教えてください。ありがとう。 – gamepop

+1

いくつかの解決策については、私の答えを編集してください。 1つはjava dsl、もう1つはjava configです。 –

+0

これは完璧に動作します。私はまた、sftpリモートロケーションからピックアップした後にファイルを削除したい、delete-remote-filesオプションも欠けている。それを達成する最善の方法は何ですか、Outbound Gatewayを再度使用してrmコマンドを実行しますか? – gamepop

関連する問題