2017-03-24 12 views
0

のキーを変更した後にパーティーをしておいてください。残念ながらこれはダンプに関する質問ですが、私はSparkの新機能です。スパーク。

私はSparkでいくつかのグループ操作をしようとしています。私はRDDのキーを変更するときに余分なシャッフルを避けようとしています。

case class Key1 (a: String, b: String) 

val grouped1: RDD[(Key1, String)] = rdd1.keyBy(generateKey1(_)) 
val grouped2: RDD[(Key1, String)] = rdd2.keyBy(generateKey2(_)) 

val joined: RDD[(Key1, (String, String)) = groped1.join(grouped2) 

今私がキーに新しいフィールドを含めるといくつかの作業を軽減します:

オリジナルRDDSは私のコードは次のようになり、ロジックの簡素化、JSON文字列

です。キーは、パーティションが失われた変更、ように、操作はおそらくデータをシャッフルしますが、それは意味がありません削減しているとして、私は、間違っていないよ場合

case class key2 (a: String, b: String, c: String) 

val withNewKey: RDD[Key2, (String, String)] = joined.map{ case (key, (val1, val2)) => { 
    val newKey = Key2(key.a, key.b, extractWhatever(val2)) 
    (newKey, (val1, val2)) 
}} 

withNewKey.reduceByKey..... 

:だから私のようなものを持っていますキーは延長され、シャッフルは必要ありません。

何か不足していますか?シャッフルを避けるにはどうすればいいですか?

おかげ

答えて

2

あなたはtruepreservesPartitioningセットでmapPartitionsを使用することができます。

joined.mapPartitions(
    _.map{ case (key, (val1, val2)) => ... }, 
    true 
)