を低減し、出力が多い[K、V〕ペア、例えば:地図と配列
input: array(1,2,3,4,5,6,7,8,9,7,12,11)
output: (1 => 2,3) (4 => 5,6)(7 => 8,9) (7 => 12,11)
そして、私はまた、キーによってこれらの対を削減したいです、たとえば、key = 7でデータを収集する場合、出力は(7=> 8,9,12,11).
ありがとうございます。
を低減し、出力が多い[K、V〕ペア、例えば:地図と配列
input: array(1,2,3,4,5,6,7,8,9,7,12,11)
output: (1 => 2,3) (4 => 5,6)(7 => 8,9) (7 => 12,11)
そして、私はまた、キーによってこれらの対を削減したいです、たとえば、key = 7でデータを収集する場合、出力は(7=> 8,9,12,11).
ありがとうございます。
は
val input = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 7, 12, 11)
val output = input.toSeq.grouped(3)
.map(g => (g.head, g.tail)).toList
.groupBy(_._1)
.mapValues(l => l.flatMap(_._2))
結果が
地図(4になり、次のさ - >リスト(8,9,12,11)、1 - >リスト(2,3))
groupped()の良いもの – BDR
こんにちは、入力アレイがRDDのパーティションである場合、パーティション内に[k、v]ペアを作成する必要があることを意味します。それは大丈夫ですか? – miaoiao
@miaoiao、あなたが何をしようとしているのかは分かりませんが、RDDでは達成するのが難しいでしょう。 RDDは、 "Resilient ** Distributed ** Dataset"と呼ばれています。これは、デフォルトでは順序付けやノード間の分割(例えば、2つのSpark-Node間で4分割された6要素のRDDと、そのような分割の後にあなたの操作は意味をなさない)。おそらく、以前のステップでデータを集約して、統一リストにダンプしてそこからグループ化しようとするべきでしょう。 – SergGr
はこのお試しください:
res0 = list.grouped(3).map {x => (x(0), List(x(1),x(2)))}.toList
// you must dump your converted data format into your storage eg hdfs.
// And not the entire thing in the form of array. Transform in form of
// (key,value) and dump in hdfs. That will save a lot of computation.
res1 = sc.parallelize(res0)
res2 = res1.reduceByKey(_++_).collect
をしかし、私は、このソリューションは次のようになり、どのくらいスケーラブルではありません確信しています。 >一覧(5、6)、7 - 私は何が必要だと思う
EDIT
val res1 = sc.parallelize(arr)
// (1,2,3,4,5,6,7,8,9,7,12,11)
val res2 = res1.zipWithIndex.map(x._2/3,List(x._1))
// (1,0),(2,1),...(12,10),(11,11) -> (0,1),(0,2),(0,3),(1,4),(1,5),(1,6)
val res3 = res2.reduceByKey(_++_).map(_._2)
//(0,List(1,2,3)),(1,List(4,5,6)) -> List(1,2,3),List(4,5,6)
val res4 = res3.map(x => x match {
case x1::xs => (x1,xs)
}).reduceByKey(_++_)
//List(1,2,3) - > (1,List(2,3)) -> reduceByKey
//(1,List(2,3)),(4,List(5,6)),(7,List(8,9,12,11))
3つのすべてのペア( 'i + = 3')に対して単純に反復処理を行い、値を追加するためにマルチマップを使用するのが良いでしょう。通常、キー/バリューストアでは、キー値が重複していることはありません。 – Rogue
ご質問はありましたか? "私は欲しい"という質問はない。 –
この質問はApache Sparkにどのように関連していますか? –