2017-04-08 10 views
0

私の要件は、RDDの各グループの最大値を見つけることです。カスタム関数をscalaのRDDのreduceByKeyに渡す方法

私は以下を試しました。

scala> val x = sc.parallelize(Array(Array("A",3), Array("B",5), Array("A",6))) 
x: org.apache.spark.rdd.RDD[Array[Any]] = ParallelCollectionRDD[0] at parallelize at <console>:27 

scala> x.collect 
res0: Array[Array[Any]] = Array(Array(A, 3), Array(B, 5), Array(A, 6))   

scala> x.filter(math.max(_,_)) 
<console>:30: error: wrong number of parameters; expected = 1 
       x.filter(math.max(_,_)) 
          ^

私も以下を試しました。 オプション1:

scala> x.filter((x: Int, y: Int) => { math.max(x,y)}) 
<console>:30: error: type mismatch; 
found : (Int, Int) => Int 
required: Array[Any] => Boolean 
       x.filter((x: Int, y: Int) => { math.max(x,y)}) 

はオプション2:この権利を取得する方法

scala> val myMaxFunc = (x: Int, y: Int) => { math.max(x,y)} 
myMaxFunc: (Int, Int) => Int = <function2> 

scala> myMaxFunc(56,12) 
res10: Int = 56 

scala> x.filter(myMaxFunc(_,_)) 
<console>:32: error: wrong number of parameters; expected = 1 
       x.filter(myMaxFunc(_,_)) 

+0

-であるとして、あなたはそれを使用することができ、math.max用に独自のラッパー関数を記述する必要はありませんか?フィルターを使用する理由 – stholzm

答えて

1

私は推測することができますが、おそらくあなたは何をしたい:

val rdd = sc.parallelize(Array(("A", 3), ("B", 5), ("A", 6))) 
val max = rdd.reduceByKey(math.max) 
println(max.collect().toList) // List((B,5), (A,6)) 

の代わりに「この権利を取得する方法は?」あなたはあなたの期待される結果が何であるかを説明したはずです。(なぜ??)

  • reduceByKeyだけPairRDD秒で動作しますので、あなたはタプルの代わりArray[Any]を必要とする(とにかく悪いタイプである)filter代わりのreduceByKeyを使用して

    • :私はあなたがいくつかミスをしたと思います
    • あなたはreduceByKeyがあなたのコードである
  • 関連する問題