2016-08-19 5 views
1

Spark(Scala)でデータを単純にグループ化する必要があります。特に、これは私の最初のデータである。Sparkでデータを効率的にグループ化する

1, a, X 
1, b, Y 
2, a, Y 
1, a, Y 

val seqs = Seq((1, "a", "X"),(1, "b", "Y"),(2, "a", "Y"),(1, "a", "Y")) 

次のように私は最初のキーによってグループにそれを必要とする:

1, (a, X), (b, Y), (a, Y) 
2, (a, Y) 

は私の最初のIDIAはDataFramegroupByを使用していたが、私はこのことを読みます操作は非常に高価であり、すべてのデータを完全に再シャッフルする必要があります。

したがって、グループ化を実行するためのより安価なオプションは何ですか?具体的な例が分かるだろう。

+0

しかしグループbyはまさにあなたが望むもので、並列処理はsparkの強みなので、groupByをRDDまたはDataframeにすることができます。もう一つの方法はReduceをはるかに効率的にすることですが、問題は最後にグループごとに1つのキー値のペアが残っていることです。 – GameOfThrows

+0

シャッフルせずにパーティション間で物事をグループ化することはできません。アイテムがどこに行く必要があるか考えてみてください。それは必然的に高価なので高価です。 –

答えて

1

は、あなたが潜在的にこのような何かを行うことができます:

val rdd = sc.parallelize(List((1, "a", "X"),(1, "b", "Y"),(2, "a", "Y"),(1, "a", "Y"))) 
    val mapping = rdd.map(x=>(x._1,List((x._2,x._3)))) 
    val result = mapping.reduceByKey((x,y) => (x ++ y)) 

これはreduceByKeyを使用していますが、すべての問題は、プロセスを減らすには、グループごとに1キーと値のペアで終わる必要があります。この場合、明示的に各値をListに変換する必要があるため、reduceプロセスはそれらをマージすることができます。あなたはまた、内部を使用する、combineByKeyを見て検討すること

は、プロセス

を減らす====== ====== EDIT

zero323は指摘し、ここに削減増加しないと効率とは逆に、プロセスはgroupByKeyの最適化を失います。

+0

ありがとうございます。ちょっとした質問です(私の説明では、この詳細が不足している可能性があります)。私の実際のデータは、RDD [String]という形式です。ですから、特定の値を得るために、 'val splitted = rdd.map(line => line.split("、 "))'を実行します。 '1'、' a'、 'X'。変数 'splitted'は' RDD [Array [String]] '型です。 'reduceByKey'を' splitted'にどのように適用すればよいですか? – HackerDuck

+0

@HackerDuck文字列の配列をケースクラスなどで(Int、String、String)構造体にマップする必要があります。変換するときにスキーマとしても機能するため、ケースクラスにマップするのが一般的ですそれはDataframeに(あなたが望むなら)それを入れます。 – GameOfThrows

+0

'(x ++ y)'はどういう意味ですか? – HackerDuck

関連する問題