2016-07-14 9 views
15

私はRDDに加えてDataFramesとDataSetsの使用方法を学びたいと考えています。 RDDの場合、私はsomeRDD.reduceByKey((x,y) => x + y)を行うことができますが、私はデータセットのためのその機能は表示されません。だから私は1つ書くことにしました。Spark Datasetで独自のreduceByKeyをローリング

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => { 
    val result = mutable.HashMap.empty[(Long,Long),Int] 
    val keys = mutable.HashSet.empty[(Long,Long)] 
    y.keys.foreach(z => keys += z) 
    x.keys.foreach(z => keys += z) 
    for (elem <- keys) { 
    val s1 = if(x.contains(elem)) x(elem) else 0 
    val s2 = if(y.contains(elem)) y(elem) else 0 
    result(elem) = s1 + s2 
    } 
    result 
}) 

ただし、ドライバにすべてが返されます。 Datasetを返すにはどうすればいいですか?たぶんmapPartitionそれを行うか?それはMapまだ

+0

、Spark 2.0.0を試してみてください、あなたのデータセット.groupByKey(...)。reduceGroups(...) –

+6

触媒オプティマイザは、グループに続いて還元を行い、 「効率的」というのは、RDDでキーを減らしていくことが、グループを減らしていくよりも優れているということです。 –

答えて

18

ためのエンコーダを持っていないため

注これはコンパイルが、実行されません。私はあなたの目標は、データセットには、このイディオムを翻訳することであると仮定します。

rdd.map(x => (x.someKey, x.someField)) 
    .reduceByKey(_ + _) 

// => returning an RDD of (KeyType, FieldType) 

現在、最も近いソリューションIデータセットAPIで見つかったのは次のようなものです:

ds.map(x => (x.someKey, x.someField))   // [1] 
    .groupByKey(_._1)        
    .reduceGroups((a, b) => (a._1, a._2 + b._2)) 
    .map(_._2)         // [2] 

// => returning a Dataset of (KeyType, FieldType) 

// Comments: 
// [1] As far as I can see, having a map before groupByKey is required 
//  to end up with the proper type in reduceGroups. After all, we do 
//  not want to reduce over the original type, but the FieldType. 
// [2] required since reduceGroups converts back to Dataset[(K, V)] 
//  not knowing that our V's are already key-value pairs. 

クイックベンチマークによれば、それも非常によく見えませんここで何かが欠けているかもしれません...

注:代わりの方法は、最初の手順としてgroupByKey(_.someKey)を使用することです。問題は、groupByKeyを使用すると、タイプが通常のDatasetからKeyValueGroupedDatasetに変更されることです。後者には通常のmap機能はありません。代わりにmapGroupsを提供しています。これは、値をIteratorにラップして、docstringに従ってシャッフルを実行するので非常に便利ではありません。

+3

これはトリックです。ただし、reduceByKeyは、シャッフル前に各ノードで減少するため、より効率的です。最初にgroupByKeyを実行すると、すべての要素がシャッフルされてから縮小が始まります。だからこそ、それほどパフォーマンスが低いのです。私はreduceByKeyについて知る前にこれをやっていたのですが、私は忘れてしまいました:-) –

+0

@CarlosBribiescasデータセットがSparksのCatalyst Optimizerを利用していることを私は読んだことがあります。シャッフルする前に機能を減らす。これは 'Dataset' APIに' reduceByKey'がない理由を説明します。しかし、私の経験ではこれは当てはまりません。 'groupByKey.reduceGroups'はかなりのデータをシャッフルし、' reduceByKey'よりもかなり遅いです。 –

+4

reduceGroupsのパフォーマンスが2.0.1と2.1.0から修正されているようです。[Spark-16391](https://issues.apache.org/jira/browse/SPARK-16391) – Franzi

3

より効率的なソリューションは、シャッフルの量を減らすためにgroupByKeymapPartitionsを使用する(これはreduceByKeyとまったく同じ署名ではなく、私はデータセットは、タプルから成る必要以上の機能を通過することがより柔軟であると思います)。

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V) 
    (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = { 
    def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = { 
    iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator 
    } 
    ds.mapPartitions(h(f, g, _)) 
    .groupByKey(f)(encK) 
    .reduceGroups(g) 
} 

あなたのデータの形状/サイズに応じて、これは約2x早くgroupByKey(_._1).reduceGroupsとしてreduceByKeyの性能を1秒以内である、と。まだ改善の余地があるので、提案は大歓迎です。

関連する問題