2016-08-24 3 views
1

私はunix location.Onceでファイルを探し続ける必要があります。それが利用可能な場合は、解析してjsonフォーマットに変換する必要があります。 Spring統合 - DSLを使用して行われます。続き は、私は春サイトから得たコードの一部ですが、それは例外で、次を示しています。以下はDSLを使用してUnixロケーションからファイルを読み込むためのSpringインテグレーション

o.s.integration.handler.LoggingHandler: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.processFileChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 

はコードです:

@SpringBootApplication 
public class FileReadingJavaApplication { 

    public static void main(String[] args) { 
     new SpringApplicationBuilder(FileReadingJavaApplication.class) 
      .web(false) 
      .run(args); 
    } 

    @Bean 
    public IntegrationFlow fileReadingFlow() { 
     return IntegrationFlows 
        .from(s -> s.file(new File("Y://")) 
           .patternFilter("*.txt"), 
          e -> e.poller(Pollers.fixedDelay(1000))) 
        .transform(Transformers.fileToString()) 
        .channel("processFileChannel") 
        .get(); 
     } 

} 

新しいコード:

@SpringBootApplication パブリッククラスSpringIntegration {

public static void main(String[] args) { 
    new SpringApplicationBuilder(SpringIntegration.class) 
    .web(false) 
    .run(args); 
} 

@Bean 
public SessionFactory<LsEntry> sftpSessionFactory() { 
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); 
    factory.setHost("ip"); 
    factory.setPort(port); 
    factory.setUser("username"); 
    factory.setPassword("pwd"); 
    factory.setAllowUnknownKeys(true); 
    return new CachingSessionFactory<LsEntry>(factory); 
} 

@Bean 
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { 
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); 
    fileSynchronizer.setDeleteRemoteFiles(false); 
    fileSynchronizer.setRemoteDirectory("remote dir"); 
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt")); 

    return fileSynchronizer; 
} 
@Bean 
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) 
public MessageSource ftpMessageSource() { 
    SftpInboundFileSynchronizingMessageSource source = 
      new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); 
    source.setLocalFilter(new AcceptOnceFileListFilter<File>()); 
    source.setLocalDirectory(new File("Local directory")); 

    return source; 
} 

@Bean 
@ServiceActivator(inputChannel = "fileInputChannel") 
public MessageHandler handler() { 
    return new MessageHandler() { 


     @Override 
     public void handleMessage(Message<?> message) throws MessagingException { 
      System.out.println("File Name : "+message.getPayload()); 

     } 

    }; 
} 

@Be public static StandardIntegrationFlow processFileFlow(){ 戻り値IntegrationFlows .from( "fileInputChannel")。split() .handle( "fileProcessor"、 "process")。get();

} 

@Bean 
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000")) 
public MessageSource<File> fileReadingMessageSource() { 
    AcceptOnceFileListFilter<File> filters =new AcceptOnceFileListFilter<>(); 

    FileReadingMessageSource source = new FileReadingMessageSource(); 
    source.setAutoCreateDirectory(true); 
    source.setDirectory(new File("Local directory")); 
    source.setFilter(filters); 

    return source; 
} 
@Bean 
public FileProcessor fileProcessor() { 
    return new FileProcessor(); 
} 


@Bean 
    @ServiceActivator(inputChannel = "fileInputChannel") 
    public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) { 
     AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); 
     outbound.setExpectReply(true); 
     outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' 
     return outbound; 
    } 



    @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") 
    public interface MyGateway { 
     String sendToRabbit(String data); 

    } 

}

FileProcessor:

パブリッククラスFileProcessor {

public void process(Message<String> msg) { 
    String content = msg.getPayload(); 
    JSONObject jsonObject ; 
    Map<String, String> dataMap = new HashMap<String, String>(); 
    for(int i=0;i<=content.length();i++){ 
    String userId = content.substring(i+5,i+16); 


    dataMap = new HashMap<String, String>(); 

    dataMap.put("username", username.trim()); 


    i+=290; //each record of size 290 in file 
    jsonObject = new JSONObject(dataMap); 
    System.out.println(jsonObject); 

    } 

} 

}

答えて

0

あなたのコードが正しいですが、例外は何か何が必要があることを示しています直接チャネル "processFileChannel"からメッセージを読み込みます。

でさまざまな種類のチャンネルをご覧ください。春の統合における第一級オブジェクトの

EDIT

一つMessageChannel抽象化です。詳細については、EIPを参照してください。

.channel("processFileChannel")のような定義は、DirectChannelと宣言しています。この種のチャネルは、送信時にメッセージを受け取り、を直接sendプロセスで処理することを意味します。生のJava言葉では、次のように聞こえるかもしれません。別のものがautowiredされていない場合はNPEを投げてください。

出力にDirectChannelを使用する場合は、のサブスクライバを宣言する必要があります。あなたのロジックは何か分かりませんが、それはどのように動作し、他の選択肢を修正することはありませんDispatcher has no subscribers for channel

他のMessageChannelタイプも使用できますが、しかし、この目的のために、より多くのdocを読んでください。マークフィッシャーのSpring Integration in Action

+0

実際、私はSpring Integrationの新機能です。私の要求に応じて変更が必要な場合は、少し難解です。 – user

+0

あなたがやっていることは、ファイルの内容を "processFileChannel"に送信することですが、そのデータを消費する何かが必要です - チャンネルから始まるいくつかのフロー。 –

+0

私の答えに 'EDIT'を見つけてください。 –

関連する問題