2017-03-11 12 views
0

私が扱う2つのキーと値のペアRDDのAとBがあります。のは、Bは10000行を持っており、私はその値によってBをソートしているとしましょう:ソートされたRDDを上位N行をフィルタリングする方法

B = B0.map(_.swap).sortByKey().map(_.swap) 

私は私が行うことができます知っているBからトップ5000を取り、Aと結合するためにそれを使用する必要があります。

B1 = B.take(5000) 

または

B1 = B.zipWithIndex().filter(_._2 < 5000).map(_._1) 

両方の計算をトリガするようです。 B1は中間結果に過ぎないので、実際の計算を引き起こさないようにしたいと思います。それを達成するより良い方法はありますか?

答えて

0

私が知る限り、RDDを使用してそれを達成する他の方法はありません。しかし、データフレームを活用して同じことを達成することができます。

  1. 最初にRDDをデータフレームに変換します。
  2. 次に、データフレームを5000値に制限します。
  3. 次に、データフレームから新しいRDDを選択できます。
  4. これまでのところ、sparkによって計算は行われません。

以下は、概念証明のサンプルです。

def main(arg: Array[String]): Unit = { 
    import spark.implicits._ 
    val a = 
     Array(
     Array("key_1", "value_1"), 
     Array("key_2", "value_2"), 
     Array("key_3", "value_3"), 
     Array("key_4", "value_4"), 
     Array("key_5", "value_5") 
    ) 

    val rdd = spark.sparkContext.makeRDD(a) 
    val df = rdd.map({ 
     case Array(key, value) => PairRdd(key, value) 
    }).toDF() 

    val dfWithTop = df.limit(3) 
    val rddWithTop = dfWithTop.rdd 
    // upto this point no computation has been triggered 
    // rddWithTop.take(100) will trigger computation 
    } 

    case class PairRdd(key: String, value: String) 
関連する問題