1つのFTPサーバー(source
)から別のFTPサーバー(target
)にファイルを送信できました。インバウンド・アダプターを使用してソースからローカル・ディレクトリーにファイルを送信し、アウトバウンド・アダプターを使用してローカル・ディレクトリーからターゲットにファイルを送信します。今のところこれはうまくいきます。1つのFTPサーバーでヘッダーを強化し、別のFTPサーバーでヘッダーを取得する
私は何を達成したいことは次のとおりです。(転送されsource
上のファイルを使用して生成される)ハッシュコードでsource
でメッセージのヘッダーを豊かにし、その後target
とマッチした時点でそのヘッダを取得しますここで(target
上のファイルを使用して生成される)ハッシュコード
と、それは私がこれまで試してみましたです:
Application.java
@SpringBootApplication
public class Application {
@Autowired
private Hashing hashing;
public static ConfigurableApplicationContext context;
public static void main(String[] args) {
context = new SpringApplicationBuilder(Application.class)
.web(false)
.run(args);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler sourceHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("Reply channel isssss:"+message.getHeaders().getReplyChannel());
Object payload = message.getPayload();
System.out.println("Payload: " + payload);
File file = (File) payload;
// enrich header with hash code before sending to target FTP
Message<?> messageOut = MessageBuilder
.withPayload(message.getPayload())
.copyHeadersIfAbsent(message.getHeaders())
.setHeaderIfAbsent("hashCode", hashing.getHashCode(file)).build();
// send to target FTP
System.out.println("Trying to send " + file.getName() + " to target");
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(messageOut);
}
};
}
}
FileTransferServiceConfig.java
@Configuration
@Component
public class FileTransferServiceConfig {
@Autowired
private ConfigurationService configurationService;
@Autowired
private Hashing hashing;
public static final String FILE_POLLING_DURATION = "5000";
@Bean
public SessionFactory<FTPFile> sourceFtpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost(configurationService.getSourceHostName());
sf.setPort(Integer.parseInt(configurationService.getSourcePort()));
sf.setUsername(configurationService.getSourceUsername());
sf.setPassword(configurationService.getSourcePassword());
return new CachingSessionFactory<>(sf);
}
@Bean
public SessionFactory<FTPFile> targetFtpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost(configurationService.getTargetHostName());
sf.setPort(Integer.parseInt(configurationService.getTargetPort()));
sf.setUsername(configurationService.getTargetUsername());
sf.setPassword(configurationService.getTargetPassword());
return new CachingSessionFactory<>(sf);
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(Message message);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(sourceFtpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(configurationService.getSourceDirectory());
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter(
configurationService.getFileMask()));
return fileSynchronizer;
}
@Bean
public AcceptOnceFileListFilter<File> acceptOnceFileListFilter() {
return new AcceptOnceFileListFilter<>();
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel",
poller = @Poller(fixedDelay = FILE_POLLING_DURATION))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source
= new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File(configurationService.getLocalDirectory()));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(acceptOnceFileListFilter());
return source;
}
// makes sure transfer continues on connection reset
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setTrapException(true);
advice.setOnFailureExpression("@acceptOnceFileListFilter.remove(payload)");
return advice;
}
@Bean
@ServiceActivator(inputChannel = "toFtpChannel")
public void listenOutboundMessage() {
// tried to subscribe to "toFtpChannel" but this was not triggered
System.out.println("Message received");
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel", adviceChain = "expressionAdvice")
public MessageHandler targetHandler() {
FtpMessageHandler handler = new FtpMessageHandler(targetFtpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(
configurationService.getTargetDirectory()));
return handler;
}
}
Hashing.java
public interface Hashing {
public String getHashCode(File payload);
}
私はsourceHandler()
にメッセージを豊かにするために管理している、メッセージを構築し、target
にそれを送ったが、私はでそのメッセージをどのように受け取ることができるかわかりません私はメッセージからヘッダを得ることができるように10?
これ以上の情報が必要な場合は教えてください。私は本当にあなたの助けに感謝します。
はい、ログには、実際には 'toFtpChannel 'チャネルに加入者として「{message-handler:fileTransferServiceConfig.listenOutboundMessage.serviceActivator}を追加しています」というメッセージが表示されます。 'listenOutboundMessage'にブレークポイントを設定しましたが、引き続きトリガーしません。 – Sibtain
'@ Bean'を' listenOutboundMessage'から削除しましたが、 'ServiceActivator'がまだトリガされていませんでした。'ftpChannel'に2人の加入者がいるからでしょうか?このコードを実行しているときにどのような変更を加えましたか?ありがとう – Sibtain
あなたが間違っていることを言うのは難しいです。私はあなたのコードをコピーし、私のサーバーを指すようにセッションの工場出荷時の設定を変更し、それは私のために正常に動作します。 2人の加入者を持つこととは何の関係もありません(間違っていますが)。デバッグロギングをオンにし、メッセージフローに従います。 –