2017-02-01 18 views
1

Java 8ストリームは再利用を許可していません。これは、x(i)* x(i-1)のような関係を計算するためにスライディングウィンドウの磁束を作成するときにストリームを再利用する方法についてのパズルを作成します。Project Reactor - Java 8ストリームからFluxを作成する方法

次のコードは、シフト演算子の考え方に基づいています。最初のストリームをskip(1)でシフトして2番目のストリームを作成します。ここで

Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed()); 
Flux<Integer> secondary = primary.skip(1); 
primary.zipWith(secondary) 
     .map(t -> t.getT1() * t.getT2()) 
     .subscribe(System.out::println); 

上記のコードを視覚的に表現される:

1 2 3 4 5 6 7 8 9 10 
v v v v v v v v v v skip(1) 
2 3 4 5 6 7 8 9 10 
v v v v v v v v v v zipWith 
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9, 9 10 <- sliding window of length 2 
v v v v v v v v v v multiples 
2 6 12 20 30 42 56 72 90 

残念ながら、このコードのエラーなど:明白な回避策は、要素をキャッシュして確保することである

java.lang.IllegalStateException: stream has already been operated upon or closed 

キャッシュサイズがストリームサイズ以上である場合:

Flux<Integer> primary = Flux.fromStream(IntStream.range(1, 10).boxed()).cache(10); 

またはストリームの交換を使用する:

Flux<Integer> primary = Flux.range(0, 10); 

だけスキップ(1)シーケンスの元の順序を再実行する第二の溶液。

ストリームは、大きなファイルであることを起こる場合は、効率的な解決策が唯一これは大したことでサイズ2のバッファが必要です。

Files.lines(Paths.get(megaFile)); 

どのように私は効率的ので、複数のサブスクリプションをストリームをバッファリングすることができます一次Fluxがすべてをメモリに読み込ませたり、再実行させたりすることはありませんか?

答えて

2

私は最終的にはバッファ指向ではありませんが、解決策を発見しました。

Flux<Integer> primary = Flux.fromStream(IntStream.range(0, 10).boxed()); 
primary.flatMap(num -> Flux.just(num, num)) 
    .skip(1) 
    .buffer(2) 
    .filter(list -> list.size() == 2) 
    .map(list -> Arrays.toString(list.toArray())) 
    .subscribe(System.out::println); 

プロセスの視覚的な表現は、以下:そして

[0, 1] 
[1, 2] 
[2, 3] 
[3, 4] 
[4, 5] 
[5, 6] 
[6, 7] 
[7, 8] 
[8, 9] 

:これは、出力され

1 2 3 4 5 6 7 8 9 
V V V V V V V V V Flux.just(num, num) 
1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 
V V V V V V V V V skip(1) 
1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 
V V V V V V V V V bufffer(2) 
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9, 9 
V V V V V V V V V filter 
1 2, 2 3, 3 4, 4 5, 5 6, 6 7, 7 8, 8 9 

インスピレーションは、第2のスライディングウィンドウの問題を解決することでした私は任意のスライディングウインドウサイズの解を作成するために上記の考えを一般化した:

public class SlidingWindow { 

    public static void main(String[] args) { 
     System.out.println("Different sliding windows for sequence 0 to 9:"); 
     SlidingWindow flux = new SlidingWindow(); 
     for (int windowSize = 1; windowSize < 5; windowSize++) { 
      flux.slidingWindow(windowSize, IntStream.range(0, 10).boxed()) 
       .map(SlidingWindow::listToString) 
       .subscribe(System.out::print); 
      System.out.println(); 
     } 

     //show stream difference: x(i)-x(i-1) 
     List<Integer> sequence = Arrays.asList(new Integer[]{10, 12, 11, 9, 13, 17, 21}); 
     System.out.println("Show difference 'x(i)-x(i-1)' for " + listToString(sequence)); 
     flux.slidingWindow(2, sequence.stream()) 
      .doOnNext(SlidingWindow::printlist) 
      .map(list -> list.get(1) - list.get(0)) 
      .subscribe(System.out::println); 
     System.out.println(); 
    } 

    public <T> Flux<List<T>> slidingWindow(int windowSize, Stream<T> stream) { 
     if (windowSize > 0) { 
      Flux<List<T>> flux = Flux.fromStream(stream).map(ele -> Arrays.asList(ele)); 
      for (int i = 1; i < windowSize; i++) { 
       flux = addDepth(flux); 
      } 
      return flux; 
     } else { 
      return Flux.empty(); 
     } 
    } 

    protected <T> Flux<List<T>> addDepth(Flux<List<T>> flux) { 
     return flux.flatMap(list -> Flux.just(list, list)) 
      .skip(1) 
      .buffer(2) 
      .filter(list -> list.size() == 2) 
      .map(list -> flatten(list)); 
    } 

    protected <T> List<T> flatten(List<List<T>> list) { 
     LinkedList<T> newl = new LinkedList<>(list.get(1)); 
     newl.addFirst(list.get(0).get(0)); 
     return newl; 
    } 

    static String listToString(List list) { 
     return list.stream() 
      .map(i -> i.toString()) 
      .collect(Collectors.joining(", ", "[ ", " ], ")) 
      .toString(); 
    } 

    static void printlist(List list) { 
     System.out.print(listToString(list)); 
    } 

} 

上記のコードの出力は次のとおりです。

Different sliding windows for sequence 0 to 9: 
[ 0 ], [ 1 ], [ 2 ], [ 3 ], [ 4 ], [ 5 ], [ 6 ], [ 7 ], [ 8 ], [ 9 ], 
[ 0, 1 ], [ 1, 2 ], [ 2, 3 ], [ 3, 4 ], [ 4, 5 ], [ 5, 6 ], [ 6, 7 ], [ 7, 8 ], [ 8, 9 ], 
[ 0, 1, 2 ], [ 1, 2, 3 ], [ 2, 3, 4 ], [ 3, 4, 5 ], [ 4, 5, 6 ], [ 5, 6, 7 ], [ 6, 7, 8 ], [ 7, 8, 9 ], 
[ 0, 1, 2, 3 ], [ 1, 2, 3, 4 ], [ 2, 3, 4, 5 ], [ 3, 4, 5, 6 ], [ 4, 5, 6, 7 ], [ 5, 6, 7, 8 ], [ 6, 7, 8, 9 ], 

Show difference 'x(i)-x(i-1)' for [ 10, 12, 11, 9, 13, 17, 21 ], 
[ 10, 12 ], 2 
[ 12, 11 ], -1 
[ 11, 9 ], -2 
[ 9, 13 ], 4 
[ 13, 17 ], 4 
[ 17, 21 ], 4 
関連する問題