2017-04-08 10 views
2

Javaプログラムは、gzipを使用して圧縮された非常に大きなファイルをWebソケット上のクライアントから取得する必要があるサーバーを実装しています。JavaはGZIPストリームを順番に解凍します。

クライアントは独自のプロトコルに埋め込まれたファイルチャンクを送信するので、クライアントからのメッセージの後にメッセージが表示され、メッセージを解析してgzippedファイルの内容を抽出します。

ファイル全体をプログラムメモリに保持できないため、各チャンクを解凍し、データを処理して次のチャンクに進みます。

私は次のコードを使用しています:

public static String gzipDecompress(byte[] compressed) throws IOException { 
    String uncompressed; 
    try (
     ByteArrayInputStream bis = new ByteArrayInputStream(compressed); 
     GZIPInputStream gis = new GZIPInputStream(bis); 
     Reader reader = new InputStreamReader(gis); 
     Writer writer = new StringWriter() 
    ) { 

     char[] buffer = new char[10240]; 
     for (int length = 0; (length = reader.read(buffer)) > 0;) { 
     writer.write(buffer, 0, length); 
     } 
     uncompressed = writer.toString(); 
    } 

    return uncompressed; 
    } 

をしかし、最初に圧縮されたチャンクで関数を呼び出すときに、私は次の例外を取得しています:

java.io.EOFException: Unexpected end of ZLIB input stream 
    at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:240) 
    at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158) 
    at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117) 
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) 
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) 
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
    at java.io.InputStreamReader.read(InputStreamReader.java:184) 
    at java.io.Reader.read(Reader.java:140) 

それは私ことを言及することが重要ですチャンクをスキップせず、チャンクを順番に解凍しようとしません。

私には何が欠けていますか?

+1

このデータがどこから来るのかは不明です。 *すべての*データを読み込むストリームを1つ作成し、GZipInputStreamでラップする必要があります。メモリ*にすべてのデータ*を入れる必要はありませんが、単一のストリームでなければなりません。 –

答えて

2

問題は、これらのチャンクを手動で再生することです。

InputStreamを入手し、GZIPInputStreamとラップし、次にデータを読み取ることが正しい方法です。あなただけのreaderから一度に10キロバイトに言わせれば、ストリーム方式で

InputStream is = // obtain the original gzip stream 

    GZIPInputStream gis = new GZIPInputStream(is); 
    Reader reader = new InputStreamReader(gis); 

    //... proceed reading and so on 

GZIPInputStream作品は、そう、全体的なメモリフットプリントは関係なく、初期GZIPファイルのサイズのローになります。質問が

を更新しました

を更新した後、状況に可能な解決策は、あなたのクライアントプロトコルハンドラによってチャンクでそれに置かれているバイトストリームInputStream実装を記述することです。ここで

はプロトタイプである:あなたの伸長コードは(Reader経由)このストリームからデータを引き出しながら

public class ProtocolDataInputStream extends InputStream { 
    private BlockingQueue<byte[]> nextChunks = new ArrayBlockingQueue<byte[]>(100); 
    private byte[] currentChunk = null; 
    private int currentChunkOffset = 0; 
    private boolean noMoreChunks = false; 

    @Override 
    public synchronized int read() throws IOException { 
     boolean takeNextChunk = currentChunk == null || currentChunkOffset >= currentChunk.length; 
     if (takeNextChunk) { 
      if (noMoreChunks) { 
       // stream is exhausted 
       return -1; 
      } else { 
       currentChunk = nextChunks.take(); 
       currentChunkOffset = 0; 
      } 
     } 
     return currentChunk[currentChunkOffset++]; 
    } 

    @Override 
    public synchronized int available() throws IOException { 
     if (currentChunk == null) { 
      return 0; 
     } else { 
      return currentChunk.length - currentChunkOffset; 
     } 
    } 

    public synchronized void addChunk(byte[] chunk, boolean chunkIsLast) { 
     nextChunks.add(chunk); 
     if (chunkIsLast) { 
      noMoreChunks = true; 
     } 
    } 
} 

あなたのクライアントプロトコルハンドラは、addChunk()を使用して、バイトチャンクを追加します。

このコードはいくつかの問題があることに注意してください:使用されて

  1. キューは、限られたサイズを持っています。 addChunk()があまりにも頻繁に呼び出されている場合は、キューがいっぱいになり、addChunk()をブロックします。これは望ましくないかもしれない。
  2. read()メソッドのみが説明のために実装されています。パフォーマンスを向上させるには、同じ方法でread(byte[])を実装する方がよいでしょう。
  3. リーダ(デコンプレッサ)とライタ(プロトコルハンドラを呼び出すaddChunk())が異なるスレッドであるという前提の下、保守的な同期化が使用されています。
  4. InterruptedExceptionは、あまり詳細を避けるため、take()で処理されません。

あなたのデコンプレッサおよびaddChunk()が(同じループ内で)同じスレッドで実行する場合は、Readerを引いたときにInputStreamまたはReader.ready()を使用して引っ張ったときにInputStream.available()メソッドを使用することを試みることができます。

+1

ByteArrayInputStreamまたはバイト配列をラップする他のInputStreamをGZIPInputStreamに渡すInputStreamとして使用できませんか?私の状況では、私は実際にサーバーからのデータを取得する元のInputSteamを使用することはできません。 – Eldad

+0

元の 'InputStream'を使用できないのはなぜですか?私が知っているバイトで 'GZIPInputStream'を供給するための唯一の安全な方法は、すべてのバイトをメモリに最初に読み込むことです。これは大きなファイルでは必要ありません。 –

+0

状況をより詳しく説明するために詳細を追加しました。私は独自のプロトコルの中に埋め込まれたファイルチャンクを取得していますので、InputStreamは完全なプロトコルメッセージを取得し、解析してからファイルチャンクを抽出してから、クライアントを制御せず、次のファイルチャンクを含む次のメッセージがいつ到着するかを知らない。ありがとう、悪い記述には申し訳ありません。 – Eldad

0

gzippedストリームからの任意のバイト列は、有効なスタンドアロンgzipデータではありません。いずれかの方法で、すべてのバイトチャンクを連結する必要があります。

最も簡単な方法は、単純なパイプでそれらすべてを蓄積することである。

import java.io.PipedOutputStream; 
import java.io.IOException; 
import java.util.zip.GZIPInputStream; 

public class ChunkInflater { 
    private final PipedOutputStream pipe; 

    private final InputStream stream; 

    public ChunkInflater() 
    throws IOException { 
     pipe = new PipedOutputStream(); 
     stream = new GZIPInputStream(new PipedInputStream(pipe)); 
    } 

    public InputStream getInputStream() { 
     return stream; 
    } 

    public void addChunk(byte[] compressedChunk) 
    throws IOException { 
     pipe.write(compressedChunk); 
    } 
} 

今、あなたはあなたが望むものは何でも単位で読むことができるInputStreamを持っています。たとえば、次のようになります。

ChunkInflater inflater = new ChunkInflater(); 

Callable<Void> chunkReader = new Callable<Void>() { 
    @Override 
    public Void call() 
    throws IOException { 
     byte[] chunk; 
     while ((chunk = readChunkFromSource()) != null) { 
      inflater.addChunk(chunk); 
     } 

     return null; 
    } 
}; 
ExecutorService executor = Executors.newSingleThreadExecutor(); 
executor.submit(chunkReader); 
executor.shutdown(); 

Reader reader = new InputStreamReader(inflater.getInputStream()); 
// read text here 
関連する問題