2016-04-11 19 views
4

「Apache Camelでキャッシュを使用する方法と大容量ファイルを処理する方法」というテーマで取り組んでいます。キャッシュストリーミング大容量ファイルを扱う

目的は、ラージファイルを5 GOを超える膨大なファイルであるため、ファイルをメモリに読み込まずに処理することです。

複数のトラックが見つかりました。最初のトラックはスプリッタコンポーネントを使用してファイルをブロックごとに1行ずつ読み込むことができますが、スプリッタを使用すると再び読み込むことはできませんファイルが最初から存在する場合、スプリットが終了してもファイルの一部を読み取ることが機能的に必要になります。

キャッシュシステムを使用して、ブロックをキャッシュに入れて再利用する必要があります。

私たちは、CachedOutputStreamクラスを使用してスプリッタの後のファイルの一部をディスクに書き込むことが義務付けられていると考えました。このクラスは、ディスク上のデータを暗号化する機能も提供します。

例以下:

<camelContext xmlns="http://camel.apache.org/schema/spring" trace="false" streamCache="true"> 

    <streamCaching id="myCacheConfig" spoolDirectory="target/cachedir" spoolThreshold="16"/> 

    <route id="SPLIT-FLOW" streamCache="true"> 
     <from uri="file:src/data/forSplitCaching\SimpleRecord?noop=true"/> 
     <split streaming="true"> 
      <tokenize token="\n"/> 
      <to uri="direct:PROCESS-BUSINESS"/> 
     </split> 
    </route> 

    <route id="PROCESS-BUSINESS" streamCache="true"> 
     <from uri="direct:PROCESS-BUSINESS"/> 
     <bean ref="ProcessBusiness" method="dealRecord"/> 
     <choice> 
      <when> 
       <simple>${in.header.CamelSplitComplete} == "true"</simple> 
       <to uri="direct:STREAM-CACHING"/> 
      </when> 
     </choice> 
    </route> 

    <route id="STREAM-CACHING"> 
     <from uri="direct:STREAM-CACHING"/> 
     <bean ref="ProcessStreamCaching" method="usingStream"/> 
     <setHeader headerName="CamelFileName"> 
      <simple>${header.CamelFileName}.${header.CamelSplitIndex}</simple> 
     </setHeader> 
     <to uri="file:src/out"/> 
    </route> 

</camelContext> 

dealRecordがキャッシュに分割さ各行置く方法:方法usingStreamは、各キャッシュを使用することができ

public void dealRecord(Exchange exchange) throws Exception { 

    String body; 
    File file; 
    String[] files; 
    boolean isSplitComplete; 

    body = (String) exchange.getIn().getBody(); 
    isSplitComplete = (boolean) exchange.getProperties().get("CamelSplitComplete"); 

    CachedOutputStream cos = new CachedOutputStream(exchange, false); 
    cos.write(body.getBytes("UTF-8")); 

    file = new File("target/cachedir"); 
    files = file.list(); 
    for (String nameTmpfile : files) { 
     LOG.info("Genered File [" + nameTmpfile + "]"); 
    } 

    lstCache.add(cos); 

    if(isSplitComplete){ 
     exchange.getIn().setHeader("Cached",lstCache); 
    } 
} 

ヘッダ内に存在する

public byte[] usingStream(Exchange exchange) throws InputStreamException { 

    final ArrayList<CachedOutputStream> lstcache; 
    byte[] bytesMessage; 
    StringBuilder messageCompleteOut = new StringBuilder(); 
    InputStream is = null; 

    lstcache = (ArrayList<CachedOutputStream>) exchange.getIn().getHeader("Cached"); 
    for (CachedOutputStream oneCache : lstcache) { 
     try { 
     is = oneCache.getWrappedInputStream(); 
     String messageInputstream = toString(is); 
     LOG.info("Message of Cache ["+ messageInputstream +"]"); 
     messageCompleteOut.append(messageInputstream); 
     messageCompleteOut.append(System.lineSeparator()); 
     } catch (IOException e) { 
     LOG.error(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL); 
     throw new InputStreamException(InputStreamException.ERROR_MANIPULATING_INPUT_STREAM_CHANNEL,e); 
     } 
     // On ferme le flux 
     IOHelper.close(is); 
    } 
    bytesMessage = messageCompleteOut.toString().getBytes(Charset.forName("UTF-8")); 
    return bytesMessage; 
} 

解決策は大丈夫ですか?またはもっと良い方法があるかもしれませんか?

thxs

答えて

0

GenericFileMessage(ファイル・コンポーネントによって使用されるメッセージ実装)必要なメモリunlesにファイルの内容をロードしません。実際には、身体にアクセスして強制的に変換しないようにするだけです。独自のメッセージ(GenericFileMessageから継承)を作成し、そのような変換を防ぎ、別のもの(何らかの "ダイジェスト")を返すこともできます。

途中のプロセッサはファイルシステム内のファイルの場所を(メッセージヘッダーから)取得して直接開くことができ、ファイルメッセージを別のメッセージで置き換えることができます。

+0

このクラスGenericFileMessageを使用すると、デフォルトでファイルがメモリにロードされ、特定のキャッシュにその施設を指定するために使用できるすべてのメソッドを管理できます。 – Kikou

+0

この回答を見るhttp://stackoverflow.com/a/9389465/2956532 ファイルの内容がメモリに読み込まれません。 Camelは、実際にはWrappedFileインスタンスをメッセージ本文に渡します。 –

関連する問題