2009-04-17 20 views
24

Javaで2つのInputStreamがある場合、それらをマージする方法があるので、両方のストリームの出力を与える1つのInputStreamで終了しますか?どうやって?私は考えることができることを2つの入力ストリームをJavaでどのようにマージしますか?

+3

どのように正確にマージしますか?最初のストリームを読み込んだ後、2番目のストリームからシームレスに読み込みを続けますか?私はJavaに慣れていませんが、C#では、Streamから継承したクラスを実装して、両方の基本ストリームへの参照を含むようにしてから、Readメソッドをオーバーライドすることで簡単に実行できます。 – Noldorin

答えて

37

コメントとして、それはあなたがマージによって何を意味するかは明らかではありません。

いずれかから「ランダムに」入力を得ることは、InputStream.availableによって複雑になり、必ずしもストリームの有効な回答とブロック動作を与えるとは限りません。ストリームから読み込み、次にjava.io.Piped(In|Out)putStreamなどのデータを戻すには2つのスレッドが必要です(これらのクラスには問題があります)。あるいは、いくつかのタイプのストリームに対して、例えばjava.nioの非ブロッキングチャネルのような異なるインタフェースを使用することが可能である。

最初の入力ストリームの完全な内容の後に2番目の入力ストリームが続くようにする場合:new java.io.SequenceInputStream(s1, s2)

+2

ああ、とてもいいです、私はちょうど新しいことを学びました。 SequenceInputStreamはCatInputStreamと本質的に同じですが、LinkedListではなく従来のEnumerationsを使用しています。 :-) –

+0

答えの最初の部分のハックとして、一般的なケースで解決するのは難しいですが、FileInputStreamの特定のケース(そしておそらくソケットですか?)では、インスタンス化/キャストしてチャンネルを作成することができます。 (他のストリームはChannels.newChannelを使用して一貫したインターフェイスを作成できますが、ノンブロッキングの資質は必要ありません) –

+0

Collections.enumerationはあなたの友人です。私は最初の部分の一部を忘れました - 編集します。 –

0

ありません。おそらく、2つのストリームの内容をバイト[]に読み込んだ後、そこからByteArrayInputStreamを作成する必要があります。

+0

シンプルでわかりやすい、実行可能なソリューションのためのUpvote。ブロックが重要な場合(またはブロックが巨大である場合)、必要な動作をしない可能性があります。 –

4

これを行うカスタムInputStreamの実装を記述することができます。例:

import java.io.IOException; 
import java.io.InputStream; 
import java.util.Collections; 
import java.util.Deque; 
import java.util.LinkedList; 

public class CatInputStream extends InputStream { 
    private final Deque<InputStream> streams; 

    public CatInputStream(InputStream... streams) { 
     this.streams = new LinkedList<InputStream>(); 
     Collections.addAll(this.streams, streams); 
    } 

    private void nextStream() throws IOException { 
     streams.removeFirst().close(); 
    } 

    @Override 
    public int read() throws IOException { 
     int result = -1; 
     while (!streams.isEmpty() 
       && (result = streams.getFirst().read()) == -1) { 
      nextStream(); 
     } 
     return result; 
    } 

    @Override 
    public int read(byte b[], int off, int len) throws IOException { 
     int result = -1; 
     while (!streams.isEmpty() 
       && (result = streams.getFirst().read(b, off, len)) == -1) { 
      nextStream(); 
     } 
     return result; 
    } 

    @Override 
    public long skip(long n) throws IOException { 
     long skipped = 0L; 
     while (skipped < n && !streams.isEmpty()) { 
      int thisSkip = streams.getFirst().skip(n - skipped); 
      if (thisSkip > 0) 
       skipped += thisSkip; 
      else 
       nextStream(); 
     } 
     return skipped; 
    } 

    @Override 
    public int available() throws IOException { 
     return streams.isEmpty() ? 0 : streams.getFirst().available(); 
    } 

    @Override 
    public void close() throws IOException { 
     while (!streams.isEmpty()) 
      nextStream(); 
    } 
} 

このコードはテストされていないため、マイレージが異なる場合があります。

+0

これはMerzbowの示唆しているように、SequenceInputStreamと同じことをしませんか? –

+0

申し訳ありませんが、タックラインではSequenceInputStreamが最初に提案されました(と私は彼のために+1しました)。 SOでは、最も初期の良い答えが勝ちます。後で答えが盗用であるかどうかは決して分かりません。また、SequenceInputStreamとCatInputStreamの比較のためのtacklineの答えに関する私のコメントを読んでください(私はCollections.enumerationの使用について彼の要点を思い出します)。 –

+0

私の最後のコメントのために私の答えを下落させた人がそうしたならば、私は謝ります。私はよりうまく説明する必要があります:前に投稿された回答の重複(またはサブセット)と判明した回答を書いた場合、私は通常それを削除します。 SO上では、実際には「西洋で最速の銃」(その質問タイトルを検索)です。 –

14

java.io.SequenceInputStreamは何が必要かもしれません。ストリームの列挙を受け入れ、すべてのストリームが空になるまで、最初のストリームの内容、次に2番目の内容などを出力します。ここで

0

は、バイト配列(独自のパッケージ定義を追加してください)に固有MVAR実装です。ここから、マージされたストリームに入力ストリームを書き込むのは簡単です。私はそれが要求された場合でも投稿できます。

import java.nio.ByteBuffer; 

public final class MVar { 

    private static enum State { 
    EMPTY, ONE, MANY 
    } 

    private final Object lock; 

    private State state; 

    private byte b; 

    private ByteBuffer bytes; 
    private int length; 

    public MVar() { 
    lock = new Object(); 
    state = State.EMPTY; 
    } 

    public final void put(byte b) { 
    synchronized (lock) { 
     while (state != State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     this.b = b; 
     state = State.ONE; 
     lock.notifyAll(); 
    } 
    } 

    public final void put(byte[] bytes, int offset, int length) { 
    if (length == 0) { 
     return; 
    } 
    synchronized (lock) { 
     while (state != State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     this.bytes = ByteBuffer.allocateDirect(length); 
     this.bytes.put(bytes, offset, length); 
     this.bytes.position(0); 
     this.length = length; 
     state = State.MANY; 
     lock.notifyAll(); 
    } 
    } 

    public final byte take() { 
    synchronized (lock) { 
     while (state == State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     switch (state) { 
     case ONE: { 
     state = State.EMPTY; 
     byte b = this.b; 
     lock.notifyAll(); 
     return b; 
     } 
     case MANY: { 
     byte b = bytes.get(); 
     state = --length <= 0 ? State.EMPTY : State.MANY; 
     lock.notifyAll(); 
     return b; 
     } 
     default: 
     throw new AssertionError(); 
     } 
    } 
    } 

    public final int take(byte[] bytes, int offset, int length) { 
    if (length == 0) { 
     return 0; 
    } 
    synchronized (lock) { 
     while (state == State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     switch (state) { 
     case ONE: 
     bytes[offset] = b; 
     state = State.EMPTY; 
     lock.notifyAll(); 
     return 1; 
     case MANY: 
     if (this.length > length) { 
      this.bytes.get(bytes, offset, length); 
      this.length = this.length - length; 
      synchronized (lock) { 
      lock.notifyAll(); 
      } 
      return length; 
     } 
     this.bytes.get(bytes, offset, this.length); 
     this.bytes = null; 
     state = State.EMPTY; 
     length = this.length; 
     lock.notifyAll(); 
     return length; 
     default: 
     throw new AssertionError(); 
     } 
    } 
    } 
} 
関連する問題