2017-04-04 18 views
3

Java 8コレクションは、コレクションをストリームとして取得する機能を提供します。しかし、いったんstream()メソッドを呼び出すと、コレクションの現在の内容をストリームとして取得します。 ストリーム処理中にコレクションが大きくなった場合はどうなりますか?ストリーム上の操作によって、コレクションがより多くのデータで更新される可能性があります。この状況に対処するには、単純な&効果的な方法がありますか?ダイナミックコレクションをストリームとして扱うにはどうすればいいですか?

(Iストリーム処理操作の中から)(Stream.concatを試みたが、私は例外を取得:スレッドの例外「メイン」java.lang.IllegalStateException:ストリームが既にオペレートまたは閉鎖された)

は、撮影具体的な例として、URLの並行キューがあるとします。

Queue<Url> concurrentUrlQue= initUrlQueue(); 

ここで、このURLキューのストリームを取得し、そのURLを1つずつ処理します。このプロセスでは、キューからURLを削除し、URLが指すWebページを読み込み、ページからURLを抽出し、それらのURLを並行キューに追加します。

concurrentUrlQue.stream().forEach((url)->readAndExtractUrls(url, concurrentUrlQue)); 

私は上記の動的に成長するキューをストリームとして処理できるようにします。 (さらに、この動的キューを並列ストリームで処理できるようにしたい)

Javaストリームを使用してこれを実現する簡単な方法はありますか?

+0

あなたは(http://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html)[ 'java.util.Spliterator'のJavadocを]相談してください。それは長いですが、あなたの質問に答えてください。 – glee8e

+0

私はそれを通過しますが、spliteratorが私が解決しようとしている問題を解決する方法を私に示唆することができますか? – RajatJ

+0

さらに多くのURLを生成する可能性のあるURLがあります。これらのURLはすべて同じコレクション内にありますか?それはあなたが並行して実行することができる再帰のいくつかの並べ替えをしたいように聞こえる。 Spliteratorは興味深いように聞こえる。 –

答えて

4

新しい要素を待ってブロックするスプライテータを書く必要があります。

class QueueSpliterator<T> extends Spliterators.AbstractSpliterator<T> { 

    private final BlockingQueue<T> queue; 

    public QueueSpliterator(BlockingQueue<T> queue) { 
    super(Long.MAX_VALUE, 0); 
    this.queue = queue; 
    } 

    public boolean tryAdvance(Consumer<? super T> action) { 
    try { 
     T element = queue.take(); 
     action.accept(element); 
     return true; 
    } catch (InterruptedException e) { 
     return false; 
    } 
    } 
} 

次に、そのスプライテータを使用してストリームを作成し、通常の無限ストリームのように処理します。

public class Main { 
    public static void main(String... args) { 
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000); 

    new Thread(() -> { 
     for (int i = 0; i < 1000; ++i) { 
     try { 
      queue.put(i); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     } 
     } 
    }).start(); 


    Spliterator<Integer> queueSpliterator = new QueueSpliterator<>(queue); 
    Stream<Integer> stream = StreamSupport.stream(queueSpliterator, false); 

    stream.forEach(System.out::println); 
    } 
} 
+0

これは機能します。また、これは私にとって素晴らしい学習の練習です。 – RajatJ

+0

驚くべき答えはありがとうございます。 – jophde

関連する問題