2013-06-14 8 views
7

でストリームを分割:は、私がそこにあればそのような何かを達成するためのエレガントな方法を知りたいのですが、多くの

val l = Stream.from(1) 

val parts = l.some_function(3) //any number 

parts.foreach(println(_)) 

> 1,4,7,10... 
> 2,5,8,11... 
> 3,6,9,12... 

実は私は並列化のためのストリーム上にそのような操作を必要とする - ことなく、複数のアクター間でデータを分割しますすべてのものをメモリにロードする。

答えて

4

答えはSplit a scala list into n interleaving listsです。ストリームに合うように少し変更されています。ストリーム:

def round[A](seq: Iterable[A], n: Int) = { 
    (0 until n).map(i => seq.drop(i).sliding(1, n).flatten) 
} 
round(Stream.from(1),3).foreach(i => println(i.take(3).toList)) 
List(1, 4, 7) 
List(2, 5, 8) 
List(3, 6, 9) 
2

私は考えることができる唯一のこと:サブストリームのそれぞれが完全にメインストリームを通過しなければならないので、それは一種の醜い

def distribute[T](n: Int)(x: Stream[T]) = (0 until n).map { p => 
    x.zipWithIndex.collect { 
    case (e,i) if i % n == p => e 
    } 
} 

。しかし、私はあなたが不変性を(明らかに)保持しながらそれを緩和できるとは思わない。

あなたは個々のタスクを俳優に派遣し、まさにこれを行う「タスクディストリビューター」を持つことを考えましたか?

+0

はい、私はそれを考えました。私は俳優からの結果をマージする必要があり、問題は中間結果も多くのメモリを消費することであり、私はいくつかのアクターと同じ数のタスク/結果が必要であることを望んでいます。しかし、それにもかかわらず、以前のタスクの結果を再利用するためにアクターを改装することができました。ストリームを分割する簡単な方法がない場合は、この方法を採用します。 –

0
scala> (1 to 30 grouped 3).toList.transpose foreach println 
List(1, 4, 7, 10, 13, 16, 19, 22, 25, 28) 
List(2, 5, 8, 11, 14, 17, 20, 23, 26, 29) 
List(3, 6, 9, 12, 15, 18, 21, 24, 27, 30) 
+0

これはStreamsでうまく動作しますか? – gzm0

+0

'toList'を' toStream'に変更して自分で見つけてください... – sschaef

+1

'Stream.from(1).grouped(3).toStream.transpose foreach println'は無限ループでハングします... – gzm0

2

単純なアプローチでは、必要なインデックスの算術シーケンスを生成し、それをストリームにマッピングします。適用方法は、対応する値を引き出すであろう:

def f[A](s:Stream[A], n:Int) = 
    0 until n map (i => Iterator.iterate(0)(_+n) map (s drop i)) 

f(Stream from 1, 3) map (_ take 4 mkString ",") 
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12) 

次の方法単純算術シーケンス内の次のインデックスでストリームから値を返すイテレータ採用するであろうよりパフォーマンス溶液:

def comb[A](s:Stream[A], first:Int, step:Int):Iterator[A] = new Iterator { 
    var i  = first - step 
    def hasNext = true 
    def next = { i += step; s(i) } 
} 
def g[A](s:Stream[A], n:Int) = 
    0 until n map (i => comb(s,i,n)) 

g(Stream from 1, 3) map (_ take 4 mkString ",") 
// Vector(1,4,7,10, 2,5,8,11, 3,6,9,12) 

あなたはこれが俳優向けだと言いましたが、もしこれがAkkaなら、round-robin routerを使うことができます。

更新:上記(明らかに間違っている)は、プログラムが実行されている限り多くの作業があると想定しているため、hasNextは常にtrueを返します。有限ストリームでも動作するバージョンのMikhailの答えを参照してください。

更新:Mikhailは、this answer to a prior StackOverflow questionは実際に有限ストリームと無限ストリームに効果があると判断しました(ただし、ほぼイテレータと同様に動作しないようです)。

+0

イテレーターの作成がうまくいきます。あなたの実装でhasNextが常にtrueを返すのは唯一のことです。無限のコレクションだけを扱いますが、一般的なケースではコードはより複雑になります。 標準のScalaライブラリのアクタを使用しましたが、Akkaはそれを学ぶ価値があるようです。 –

+0

有限の場合はhttp://stackoverflow.com/questions/11132788/split-a-scala-list-into-n-interleaving-lists?lq=1も参照してください。 – AmigoNico

+0

おっと!ステップを備えた「滑り」機能がこのトリックをしました。ストリームにも適しています。したがって、カスタムイテレータの作成を避けることができました。 –

0

Scalaライブラリでこのような機能が見つかりませんでした。そのため、AmigoNicoの回答のイテレータを改訂しました。このコードは、有限集合と無限集合の両方を扱います。無限場合

def splitRoundRobin[A](s: Iterable[A], n: Int) = { 
    def comb[A](s: Iterable[A], first: Int, step: Int): Iterator[A] = new Iterator[A] { 
     val iter = s.iterator 
     var nextElem: Option[A] = iterToNext(first) 
     def iterToNext(elemsToSkip: Int) = { 
     iterToNextRec(None, elemsToSkip) 
     } 
     def iterToNextRec(next: Option[A], repeat: Int): Option[A] = repeat match { 
     case 0 => next 
     case _ => if (iter.hasNext) iterToNextRec(Some(iter.next()), repeat - 1) else None 
     } 
     def hasNext = nextElem.isDefined || { 
     nextElem = iterToNext(step) 
     nextElem.isDefined 
     } 
     def next = { 
     var result = if (nextElem.isDefined) nextElem.get else throw new IllegalStateException("No next") 
     nextElem = None 
     result 
     } 
    } 
    0 until n map (i => comb(s, i, n)) 
    } 

    splitRoundRobin(1 to 12 toStream, 3) map (_.toList.mkString(",")) 
// Vector(3,6,9,12, 1,4,7,10, 2,5,8,11) 

    splitRoundRobin(Stream from 1, 3) map (_.take(4).mkString(",")) 
//> Vector(3,6,9,12, 1,4,7,10, 2,5,8,11) 
0
def roundRobin[T](n: Int, xs: Stream[T]) = { 
    val groups = xs.grouped(n).map(_.toIndexedSeq).toStream 
    (0 until n).map(i => groups.flatMap(_.lift(i))) 
} 

作品:代わり無地map/applyflatMap/lift入力は有限であり、長さが複数ない場合でも、それが動作する手段を用い

scala> roundRobin(3, Stream.from(0)).map(_.take(3).force.mkString).mkString(" ") 
res6: String = 036 147 258 

n:

scala> roundRobin(3, Stream.from(0).take(10)).map(_.mkString).mkString(" ") 
res5: String = 0369 147 258 
関連する問題