0

私は 'spring-cloud-dataflow'という新しいプロジェクトを開始しています。プロセッサは複数の同じペイロードを持つメッセージを複数回受信します

これは、ファイルソースからのファイルをuntarするプロセッサです。このアプリケーションは、tarおよびgunzipファイル圧縮を処理する機能を備えたカスタマイズされたバージョンのintegration-zipを使用します。

私の問題は次のようなものです。私のソースがファイル参照で1つのメッセージを送信している間に、プロセッサはそれらのメッセージを複数回、同じペイロードではなく異なるIDで受信します。

Here the log file of both component

あなたはファイルを見ることができるようにメッセージに対してのみ生成:

2017-10-02 12:38:28.013 INFO 17615 --- [ask-scheduler-3] o.s.i.file.FileReadingMessageSource  : Created message: [GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={id=0b99b840-e3b3-f742-44ec-707aeea638c8, timestamp=1506940708013}]] 

プロデューサーが着信3メッセージ有している:

2017-10-02 12:38:28.077 INFO 17591 --- [   -L-1] o.s.i.codec.kryo.CompositeKryoRegistrar : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer 
2017-10-02 12:38:28.080 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=1 
a4d4b9c-86fe-d3a8-d800-8013e8ae7027, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940708079}]' unpacking started... 
2017-10-02 12:38:28.080 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress 
2017-10-02 12:38:29.106 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=c 
d611ca4-4cd9-0624-0871-dcf93a9a0051, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940709106}]' unpacking started... 
2017-10-02 12:38:29.107 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress 
2017-10-02 12:38:31.108 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}]' unpacking started... 
2017-10-02 12:38:31.108 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress 
2017-10-02 12:38:31.116 ERROR 17591 --- [   -L-1] o.s.integration.handler.LoggingHandler : org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.io.FileNotFoundException: /tmp/patent/CNINO_im_201733_batch108.tgz (File o directory non esistente), failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}], failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}] 
     at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44) 

を私はこの問題への解決策を見つけることができません誰もが同じ問題を抱えていて、それを修正する方法を見つけましたか?それとも、私が見逃す構成はありますか?

EDIT:

私は、同じファイルシステム上のIOファイル操作の仕事ので、SDFSバージョン1.2.2.RELEASEのローカルバージョンを使用している、と私はSCSのバージョンDitmars.BUILD-SNAPSHOTを使用しています。

残念ながら、ファイル削除操作アプリケーションを無効にすると、このアプリケーションはメッセージを複数回処理します。

@Override 
    protected Object doCompressTransform(final Message<?> message) throws Exception { 
    logger.info(String.format("Message '%s' unpacking started...", message)); 

    try (InputStream checkMessage = checkMessage(message); 
     InputStream inputStream = (gzCompression ? new BufferedInputStream(new GZIPInputStream(checkMessage)) : new BufferedInputStream(checkMessage))) { 

     final Object payload = message.getPayload(); 
     final Object unzippedData; 

     try (TarArchiveInputStream tarIn = new TarArchiveInputStream(inputStream)){   
     TarArchiveEntry entry = null; 

     final SortedMap<String, Object> uncompressedData = new TreeMap<String, Object>(); 

     while ((entry = (TarArchiveEntry) tarIn.getNextEntry()) != null) { 

      final String zipEntryName = entry.getName(); 
      final Date zipEntryTime = entry.getLastModifiedDate(); 
      final long zipEntryCompressedSize = entry.getSize(); 

      final String type = entry.isDirectory() ? "directory" : "file"; 

      final File tempDir = new File(workDirectory, message.getHeaders().getId().toString()); 
      tempDir.mkdirs(); // NOSONAR false positive 

      final File destinationFile = new File(tempDir, zipEntryName); 

      if (entry.isDirectory()) { 
      destinationFile.mkdirs(); // NOSONAR false positive 
      } 
      else { 
      unpackEntries(tarIn, entry, tempDir); 
      uncompressedData.put(zipEntryName, destinationFile); 
      } 
     } 

     if (uncompressedData.isEmpty()) { 
      unzippedData = null; 
     } 
     else { 
      if (this.expectSingleResult) { 
      if (uncompressedData.size() == 1) { 
       unzippedData = uncompressedData.values().iterator().next(); 
      } 
      else { 
       throw new MessagingException(message, String.format("The UnZip operation extracted %s " 
         + "result objects but expectSingleResult was 'true'.", uncompressedData.size())); 
      } 
      } 
      else { 
      unzippedData = uncompressedData; 
      } 

     } 

     logger.info("Payload unpacking completed..."); 
     } 
     finally { 
     if (payload instanceof File && this.deleteFiles) { 
      final File filePayload = (File) payload; 
      if (!filePayload.delete() && logger.isWarnEnabled()) { 
      if (logger.isWarnEnabled()) { 
       logger.warn("failed to delete File '" + filePayload + "'"); 
      } 
      } 
     } 
     } 
     return unzippedData; 
    } 
    catch (Exception e) { 
     throw new MessageHandlingException(message, "Failed to apply Zip transformation.", e); 
    } 
} 

例外:

@EnableBinding(Processor.class) 
@EnableConfigurationProperties(UnTarProperties.class) 
public class UnTarProcessor { 

    @Autowired 
    private UnTarProperties properties; 

    @Autowired 
    private Processor processor; 

    @Bean 
    public UncompressedResultSplitter splitter() { 
    return new UncompressedResultSplitter(); 
    } 

    @Bean 
    public UnTarGzTransformer transformer() { 
    UnTarGzTransformer unTarGzTransformer = new UnTarGzTransformer(properties.isUseGzCompression()); 
    unTarGzTransformer.setExpectSingleResult(properties.isSingleResult()); 
    unTarGzTransformer.setWorkDirectory(new File(properties.getWorkDirectory())); 
    unTarGzTransformer.setDeleteFiles(properties.isDeleteFile()); 

    return unTarGzTransformer; 
    } 

    @Bean 
    public IntegrationFlow process() { 

    return IntegrationFlows.from(processor.input()) 
     .transform(transformer()) 
     .split(splitter()) 
     .channel(processor.output()) 
     .get(); 
    } 
} 

これは、ファイルを解凍するために使用されるコアメソッドです:これは私のプロセッサクラスは

である:ここではいくつかのコードスニペットは、あなた、私、これは私のプロジェクトrepoであるようにメソッドcheckmessage()によってスローされます

protected InputStream checkMessage(Message<?> message) throws FileNotFoundException { 
     logger.info("Check message's payload type to decompress"); 

     InputStream inputStream; 
     Object payload = message.getPayload(); 

     if (payload instanceof File) { 
     final File filePayload = (File) payload; 

      if (filePayload.isDirectory()) { 
       throw new UnsupportedOperationException(String.format("Cannot unzip a directory: '%s'", 
         filePayload.getAbsolutePath())); 
      } 

      inputStream = new FileInputStream(filePayload); 
     } 
     else if (payload instanceof InputStream) { 
      inputStream = (InputStream) payload; 
     } 
     else if (payload instanceof byte[]) { 
      inputStream = new ByteArrayInputStream((byte[]) payload); 
     } 
     else { 
      throw new IllegalArgumentException(String.format("Unsupported payload type '%s'. " + 
        "The only supported payload types are java.io.File, byte[] and java.io.InputStream", 
        payload.getClass().getSimpleName())); 
     } 

     return inputStream; 
} 

本当にありがとう助けてください。 ありがとうございました

答えて

1

さらに詳しい情報が必要です。 SCDFおよびSCSアプリのバージョン。少なくともあなたのアプリケーションをどのように配備したかに関するあなたのDSL。

ログを確認したところ、FileNotFoundExceptionのために消費者がメッセージを消費していないことに気付きましたか?同じメッセージを複数回受け取っていないため、SCSは失敗する前に再配信を試みています。ログ全体と、指定された場所でのファイルのオープン方法を確認してください。

+0

'ファイル参照付きの単一のメッセージ 'とは何も関係していません。まあ、ファイルにアクセスするには、あなたのプロセッサーアプリが同じファイルシステム上にあることを確認する必要があります。それ以外の場合は、別のマシンに '/ tmp/patent/CNINO_im_201733_batch108.tgz'ファイルがないことが明らかです。 –

+0

私はローカルバージョンのSCDFを使用していますが、ファイルは両方のアプリケーションが読み取り可能な一時ディレクトリに保存されています。 FNF例外は、ファイルiが最初のメッセージ配信後に処理されたときにスローされます。私は私の構成とコードに関する投稿の詳細を編集しました –

0

エラーがロジックに存在するため、SCSの再試行設定のためにメッセージが何度も表示される続くことは難しい。それはFileNotFoundExceptionと言っていますあなたのプロセスがファイルをそこに置くのはわかりませんが、その理由があります。 SCS

関連する問題