2017-10-19 16 views
-1

をに変換しようとしています。これはgroupByKeyメソッドから返された出力と同じです。 groupByKeyは効率的ではないため、代わりにcombineByKeyをRDDに使用しようとしていますが、動作しません。以下は、使用されるコードは次のとおりです。combineByKeyを使用して(キー、反復可能な[値])として出力を取得します。

val data= List("abc,2017-10-04,15.2", 
      "abc,2017-10-03,19.67", 
      "abc,2017-10-02,19.8", 
      "xyz,2017-10-09,46.9", 
      "xyz,2017-10-08,48.4", 
      "xyz,2017-10-07,87.5", 
      "xyz,2017-10-04,83.03", 
      "xyz,2017-10-03,83.41", 
      "pqr,2017-09-30,18.18", 
      "pqr,2017-09-27,18.2", 
      "pqr,2017-09-26,19.2", 
      "pqr,2017-09-25,19.47", 
      "abc,2017-07-19,96.60", 
      "abc,2017-07-18,91.68", 
      "abc,2017-07-17,91.55") 
val rdd = sc.parallelize(templines) 
val rows = rdd.map(line => { 
    val row = line.split(",") 
    ((row(0), row(1)), row(2)) 
}) 

// re partition and sort based key  
val op = rows.repartitionAndSortWithinPartitions(new CustomPartitioner(4)) 
val temp = op.map(f => (f._1._1, (f._1._2, f._2))) 

val mergeCombiners = (t1: (String, List[String]), t2: (String, List[String])) => 
    (t1._1 + t2._1, t1._2.++(t2._2)) 
val mergeValue = (x: (String, List[String]), y: (String, String)) => { 
    val a = x._2.+:(y._2) 
    (x._1, a) 
} 

// createCombiner, mergeValue, mergeCombiners 
val x = temp.combineByKey(
    (t1: String, t2: String) => (t1, List(t2)), 
    mergeValue, 
    mergeCombiners) 

temp.combineByKeyは時間のコンパイルエラー与えている、私はそれを得ることができないのです。

+0

何をしようとしていますか?与えられた入力に対するサンプル出力? – mrsrinivas

答えて

3

出力がgroupByKeyの場合と似ている場合は、他の方法ではなくgroupByKeyを絶対に使用する必要があります。 reduceByKeycombineByKeyなどは、groupByKeyを使用した後に集計した場合と比べて効率的です(他の方法の1つと同じ結果が得られます)。

希望の結果がRDD[key,iterable[value]]であるため、リストを自分で作成するか、groupByKeyにすると同じ作業量になります。 groupByKeyを自分で再実装する必要はありません。 groupByKeyの問題はその実装ではなく、分散アーキテクチャにあります。

groupByKeyとこれらのタイプの最適化について詳しくは、hereをお読みください。

関連する問題