2016-09-02 8 views
-1

私は非常に基本的な質問謝罪を求めているかもしれませんが、インターネット上で答えが見つからないことがあります。私は、RDDをペアにして、aggragateByKeyのようなものを使用し、すべての値をキーで連結したいと考えています。インプットRDDで最初に発生する値は、アグレゲートRDDで最初に来るはずです。 Apache Spark Scala:キーでrddをグループ化しながら値の順序を維持する方法

Input RDD [Int, Int] 
2 20 
1 10 
2 8 
2 25 

Output RDD (Aggregated RDD) 
2 20 8 25 
1 10 

は、私は両方の出力に含ま私を与えているが、値の順序が維持されていない、aggregateByKeyとgropByKeyを試してみました。だからこそ何かを提案してください。確かに groupByKeyaggregateByKey以来

+0

あなたはしようとすることができます集計後に各コレクションを並べ替えます。あなたはそれを試しましたか? – eliasah

+0

私はそれを並べ替えることができます、私はid descまたはascを必要としません。私は入力rddに基づいて注文する必要があります。 –

+0

groupByKeyを実行すると、RDD [(Int、Iterable [Int])]が表示されます。変換を使用してRDD値をマップし、それらを配列に変換し、その配列をソートすることができます。 – eliasah

答えて

1

は順序を保持することはできません - あなたは、グループ化した後に自分自身をほのめかしていることにより、注文することができるように人為的に各レコードに「ヒント」を追加する必要があります:

val input = sc.parallelize(Seq((2, 20), (1, 10), (2, 8), (2, 25))) 

val withIndex: RDD[(Int, (Long, Int))] = input 
    .zipWithIndex() // adds index to each record, will be used to order result 
    .map { case ((k, v), i) => (k, (i, v)) } // restructure into (key, (index, value)) 

val result: RDD[(Int, List[Int])] = withIndex 
    .groupByKey() 
    .map { case (k, it) => (k, it.toList.sortBy(_._1).map(_._2)) } // order values and remove index 
関連する問題