2015-09-07 15 views
32

Java 8 Streamで「パーティション」操作を実装する方法は?パーティションとは、ストリームを所定のサイズのサブストリームに分割することです。どうにかしてGuava Iterators.partition()メソッドと同じになりますが、パーティションは、Listではなく遅延評価されたストリームであることが望ましいです。Java 8ストリームを分割する

+6

遅延評価のパーティションを作るには、私の経験では、一般的に実行不可能である - あなたは、いくつかのパーティションへの参照を維持した場合に起こることを期待して、どうなりますかそれらを順番にアクセスしましたか? –

+3

@JonSkeet - 特に平行である場合。 – OldCurmudgeon

+0

あなたの点をありがとう、ジョン、私はそれを疑った。私自身の答えで非怠惰な実装が最適だと思いますか? – Trader001

答えて

25

これは、並列処理を台無しにするので、それは、固定サイズのバッチに任意のソースストリームを分割することは不可能です。並列処理すると、分割後の最初のサブタスクの要素数が分からないことがあるため、最初のサブタスクが完全に処理されるまで、次のサブタスクのパーティションを作成することはできません。

ただし、ランダムアクセスListからパーティションのストリームを作成することは可能です。このような特徴は、私のStreamExライブラリに、例えば、提供されています:

List<Type> input = Arrays.asList(...); 

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize); 

それとも、本当にストリームのストリームをしたい場合:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream); 

サードパーティのライブラリに依存したくない場合、手動で、このようなofSubListsメソッドを実装することができます

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) { 
    if (length <= 0) 
     throw new IllegalArgumentException("length = " + length); 
    int size = source.size(); 
    if (size <= 0) 
     return Stream.empty(); 
    int fullChunks = (size - 1)/length; 
    return IntStream.range(0, fullChunks + 1).mapToObj(
     n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); 
} 

をこの実装は少し長くなりますが、それは考慮に入れCLOのようないくつかのコーナーケースを取りますse-to-MAX_VALUEリストサイズ


あなたは(あなたが要素が単一のバッチで結合されるストリーム気にしない)順不同ストリームの並列フレンドリーなソリューションをしたい場合は、(おかげでインスピレーションを得るため@sibnickする)このようにコレクタを使用することができます:

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
        Collector<List<T>, A, R> downstream) { 
    class Acc { 
     List<T> cur = new ArrayList<>(); 
     A acc = downstream.supplier().get(); 
    } 
    BiConsumer<Acc, T> accumulator = (acc, t) -> { 
     acc.cur.add(t); 
     if(acc.cur.size() == batchSize) { 
      downstream.accumulator().accept(acc.acc, acc.cur); 
      acc.cur = new ArrayList<>(); 
     } 
    }; 
    return Collector.of(Acc::new, accumulator, 
      (acc1, acc2) -> { 
       acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc); 
       for(T t : acc2.cur) accumulator.accept(acc1, t); 
       return acc1; 
      }, acc -> { 
       if(!acc.cur.isEmpty()) 
        downstream.accumulator().accept(acc.acc, acc.cur); 
       return downstream.finisher().apply(acc.acc); 
      }, Collector.Characteristics.UNORDERED); 
} 

使用例:

List<List<Integer>> list = IntStream.range(0,20) 
            .boxed().parallel() 
            .collect(unorderedBatches(3, Collectors.toList())); 

結果:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]] 

このようなコレクタは完全にスレッドセーフであり、シーケンシャルストリーム用の順序付けられたバッチを生成します。

:あなたはその場ですべてのバッチで数字を合計することができ、この方法を例

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
     Collector<T, AA, B> batchCollector, 
     Collector<B, A, R> downstream) { 
    return unorderedBatches(batchSize, 
      Collectors.mapping(list -> list.stream().collect(batchCollector), downstream)); 
} 

:あなたはすべてのバッチの中間変換を適用したい場合は

、次のバージョンを使用することができます

List<Integer> list = IntStream.range(0,20) 
     .boxed().parallel() 
     .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
      Collectors.toList())); 
+0

私はStreamExに追加された並列ソリューション(GuavaとLombokのやり方のように私のプロジェクトでは定番となっている)のようなものを見ることに非常に興味があります。私はParalellismを気にするのではなく、代わりにストリームで動作するため、StreamEx.ofSubListsは既にリストを折りたたんでいる必要がありますが、私のユースケースは通常、ストリームになります。一斉に。 – Torque

3

Jon Skeetが彼のcommentで示したように、パーティションを怠けることはできません。非怠惰なパーティションの場合、私はすでにこのコードを持っている:

public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) { 
    final Iterator<T> it = source.iterator(); 
    final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream); 
    final Iterable<Stream<T>> iterable =() -> partIt; 

    return StreamSupport.stream(iterable.spliterator(), false); 
} 
+10

私はそれが古い話題だと知っていますが、それは言及する価値があると思います。純粋なJava 8ではありません: 'Iterators'クラスはGuavaからです。 –

0

私はそれが内部のハックのいくつかの並べ替えで可能だと思う:

は、バッチのためのユーティリティクラスを作成します。

public static class ConcurrentBatch { 
    private AtomicLong id = new AtomicLong(); 
    private int batchSize; 

    public ConcurrentBatch(int batchSize) { 
     this.batchSize = batchSize; 
    } 

    public long next() { 
     return (id.getAndIncrement())/batchSize; 
    } 

    public int getBatchSize() { 
     return batchSize; 
    } 
} 

と方法:あなたが順次ストリームを使用したい提供

public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){ 
    ConcurrentBatch batch = new ConcurrentBatch(batchSize); 
    //hack java map: extends and override computeIfAbsent 
    Supplier<ConcurrentMap<Long, List<T>>> mapFactory =() -> new ConcurrentHashMap<Long, List<T>>() { 
     @Override 
     public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) { 
      List<T> rs = super.computeIfAbsent(key, mappingFunction); 
      //apply batchFunc to old lists, when new batch list is created 
      if(rs.isEmpty()){ 
       for(Entry<Long, List<T>> e : entrySet()) { 
        List<T> batchList = e.getValue(); 
        //todo: need to improve 
        synchronized (batchList) { 
         if (batchList.size() == batch.getBatchSize()){ 
          batchFunc.accept(batchList); 
          remove(e.getKey()); 
          batchList.clear(); 
         } 
        } 
       } 
      } 
      return rs; 
     } 
    }; 
    stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s)) 
      .collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList()))) 
      .entrySet() 
      .stream() 
      //map contains only unprocessed lists (size<batchSize) 
      .forEach(e -> batchFunc.accept(e.getValue())); 
} 
+0

あなたは文書化されていない事実については正しいので、私はそれを「ハック」と呼んでいます。また、あなたは非原子的な 'computeIfAbsent'について正しいです。私はすぐにコードを編集します。しかし、なぜそれは怠惰ではないのですか? 1つのバッチを処理する前にすべてのリストを割り当てるわけではありません。並行バッチ処理は順序付けされていないことも一般的です。 – sibnick

+1

パラレルストリームの場合は、まったく動作しません。 'applyConcurrentBatchToStream(System.out :: println、IntStream.range(0,100).boxed()。parallel()、3) 'ガベージ(ランダムに収集されたグループ、いくつかの要素が繰り返されます。シーケンシャル専用のストリームの場合、はるかに簡単で効率的なソリューション(OPによって提示されたものなど)があります。 –

+0

しかし、あなたはまた、バグのソースを非原子的な 'computeIfAbsent'と表示します。 – sibnick

6

、ストリームを分割(ならびにウィンドウなどの関連機能を実行することが可能である - 私はあなたが本当に欲しいものだと思ういますこの場合)。 標準ストリームのパートタイニングをサポートする2つのライブラリは、cyclops-react(私は作者)とjOOλ(サイクリック・レスポンスはウィンドウ・イングなどの機能を追加する)です。

cyclops-streamsには、Javaストリームで動作する静的関数StreamUtilsと、splitAt、headAndTail、splitBy、パーティション分割などの一連の関数があります。

ストリームをサイズ30のネストされたストリームのストリームにウィンドウするには、ウィンドウメソッドを使用できます。

OPのポイントには、ストリーミング用語では、ストリームを所定のサイズの複数のストリームに分割することは、分割操作ではなくウィンドウ操作です。

Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30); 

コード少しきれいにすることがjool.Seqを拡張し、ウィンドウイング機能を追加しますReactiveSeqと呼ばれるストリームの拡張クラスは、あります。

ReactiveSeq<Integer> seq; 
    ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30); 

Tagirは上で指摘したように、これは並列ストリームには適していません。マルチスレッドの方法で実行したいストリームをウインドウまたはバッチしたい場合。 cyclops-reactのLazyFutureStreamは役に立つかもしれません(ウィンドウ処理は予定リストにありますが、今は普通のバッチ処理が利用可能です)。

この場合、ストリームを実行する複数のスレッドからマルチプロデューサ/シングルコンシューマの待機フリーキューにデータが渡され、そのキューのシーケンシャルデータをウィンドウに戻して再びスレッドに配信できます。

Stream<List<Data>> batched = new LazyReact().range(0,1000) 
               .grouped(30) 
               .map(this::process); 
0

はここAbacusUtil

IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray())); 

宣言することにより、迅速なソリューションです:私はAbacusUtilの開発者です。

0

私が見つけたこの問題の最もエレガントで純粋なJava 8ソリューション:

public static <T> List<List<T>> partition(final List<T> list, int batchSize) { 
return IntStream.range(0, getNumberOfPartitions(list, batchSize)) 
       .mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size()))) 
       .collect(toList()); 
} 

//https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java 
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) { 
    return (list.size() + batchSize- 1)/batchSize; 
}