2011-09-12 18 views
3

Iterator[A]のサイズは無限です)とし、Iterator[B]からタイプAの後続の値が集計されたとします。Scalaに集約パターンを書くには?

例: は、私は文字列のリストを持っている:

Iterator(
    "START", 
    "DATA1", 
    "DATA2", 
    "DATA3", 
    "START", 
    "DATA1", 
    "DATA2", 
    //.. 10^10 more records 
) 

私は除いて、次の開始にSTARTから文字列に参加したいです。私。パーザを書く。

Iterator(
"START DATA1 DATA2 DATA3", 
"START DATA1 DATA2", 
    //.. 10^10/5 more records 
) 

私はこれを絶対に行う方法を知っていますが、私はスカラ高次関数でそれを達成したいと思います。何か案は?

PS EIP集約http://camel.apache.org/aggregator2.html

答えて

5

(ストリームは不変です)。それは期待どおりに動作

def aggregate(strs: Stream[String]) = { 
    aggregateRec(strs) 
} 

def aggregateRec(strs: Stream[String]): Stream[String] = { 
    val tail = strs.drop(1) 
    if(tail.nonEmpty) { 
    val (str, rest) = accumulate(tail) 
    Stream.cons(str, aggregateRec(rest)) 
    } 
    else Stream.empty 
} 

def accumulate(strs: Stream[String]): (String, Stream[String]) = { 
    val first = "START " + strs.takeWhile(_ != "START").mkString(" ") 
    val rest = strs.dropWhile(_ != "START") 
    (first, rest) 
} 

:ストリームに

val strs = Stream("START", "1", "2", "3", "START", "A", "B") 
val strs2 = aggregate(strs) 
strs2 foreach println 
+1

私はちょっと混乱しています。イテレータの基礎となる実装がストリームの場合、なぜそれで十分ではないでしょうか?つまり、私たちがIteratorを使うことができるときにストリームを明示的に使用するのはなぜでしょうか?そのIteratorがStreamを繰り返し処理する場合はどうでしょうか? –

+1

イテレータは変更可能です。 'next'を呼び出すと、反復子の状態が変わります。私はちょうど完全に機能的なソリューションを提供したかった。 – paradigmatic

+0

ああ、十分です。どうも。私は、もっと邪悪なものがないことを確かめていました。 –

1

あなたは倍でそれを試みることができる:

val ls = List(
    "START", 
    "DATA1", 
    "DATA2", 
    "DATA3", 
    "START", 
    "DATA1", 
    "DATA2" 
) 

(List[List[String]]() /: ls) { (acc, elem) => 
    if (elem == "START") 
    List(elem) :: acc // new head list 
    else 
    (elem :: acc.head) :: acc.tail // prepend to current head list 
} map (_.reverse mkString " ") reverse; 
+0

クール、私はLSは無限ストリーム/イテレータで制限を追加しました。折りたたみはここでは機能しません – yura

+0

質問が尋ねられた後、さらに制限を加えてクールではない... –

+0

申し訳ありません、それは実際に私の問題だったので、私はそれらを追加しました。私は非常に大きなログファイルのパーサーを作成したい。私はちょうどそれが私にはっきりしていることを忘れた – yura

5

まあ、無限ストリームではなく、劇的なものに変更されます。私はあなたの状況の残りの部分を理解すると仮定すると、これは動作するはずです:

def aggregate(it: Iterator[String]) = new Iterator[String] { 
    if (it.hasNext) it.next 
    def hasNext = it.hasNext 
    def next = "START " + (it.takeWhile(_ != "START")).mkString(" ") 
} 

あなたができるように:あなたは、機能ソリューションをしたい場合は、ストリームではなく、イテレータを使用する必要があります

val i = aggregate(yourStream.iterator) 
i.take(20).foreach(println) // or whatever 
0

:ここでは一つの可能​​なアプローチだ

object Iter { 
    def main(args: Array[String]) { 
    val es = List("START", "DATA1", "DATA2", "START", "DATA1", "START") 
    val bit = batched(es.iterator, "START") 
    println(bit.head.toList) 
    println(bit.tail.head.toList) 
    } 

    def batched[T](it: Iterator[T], start: T) = { 
    def nextBatch(): Stream[List[T]] = { 
     (it takeWhile { _ != start }).toList match { 
     case Nil => nextBatch() 
     case es => Stream.cons(start :: es, nextBatch()) 
     } 
    } 
    nextBatch() 
    } 

} 
関連する問題