2016-10-23 9 views
0

私はPythonでSparkを学習しようとしていますが、combineByKeyでキーと値のペアの値の平均をとっています。実際、私の混乱はcombineByKeyの構文ではなく、その後になるものです。典型的な例(O'Rielly 2015 Learning Spark Book)は、多くの場所でウェブ上で見ることができます。 here's onePython Spark combineByKey Average

問題はsumCount.map(lambda (key, (totalSum, count)): (key, totalSum/count)).collectAsMap()ステートメントにあります。 spark 2.0.1とiPython 3.5.2を使用すると、構文エラー例外がスローされます。それを単純化すると(そして、O'Reillyの本にある):sumCount.map(lambda key,vals: (key, vals[0]/vals[1])).collectAsMap()は、SparkがJavaの例外を使ってバットに夢中になりますが、私はTypeError: <lambda>() missing 1 required positional argument: 'v'というエラーを記録します。

Spark & Pythonの最新バージョンで実際に動作するこの機能の例を教えてもらえますか?完全を期すために、私は私自身の最小作業(というか、非稼働)の例を含めました:

In: pRDD = sc.parallelize([("s",5),("g",3),("g",10),("c",2),("s",10),("s",3),("g",-1),("c",20),("c",2)]) 
In: cbk = pRDD.combineByKey(lambda x:(x,1), lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])) 
In: cbk.collect() 
Out: [('s', (18, 3)), ('g', (12, 3)), ('c', (24, 3))] 
In: cbk.map(lambda key,val:(k,val[0]/val[1])).collectAsMap() <-- errors 

をそれは[(e[0],e[1][0]/e[1][1]) for e in cbk.collect()]を計算するのは簡単だが、私はむしろ働く「Sparkic」の方法を取得したいです。ステップによる

答えて

2

ステップ:

  • lambda (key, (totalSum, count)): ...をPythonで除去されたTuple Parameter Unpackingいわゆるれます。
  • RDD.mapは、単一の引数として期待する関数をとります。使用しようとしている関数:

    2つの引数を必要とする関数であり、1つではありません。

    cbk.mapValues(lambda x: x[0]/x[1]) 
    
  • 最後に数値的に安定したソリューションになります。あなたはまた、このmapValuesとはるかに単純にすることができ

    def get_mean(key_vals): 
        key, (total, cnt) = key_vals 
        return key, total/cnt 
    
    cbk.map(get_mean) 
    

    :2.xの構文からの有効な翻訳が

    lambda key_vals: (key_vals[0], key_vals[1][0]/key_vals[1][1]) 
    

    かだろうbe:

    from pyspark.statcounter import StatCounter 
    
    (pRDD 
        .combineByKey(
         lambda x: StatCounter([x]), 
         StatCounter.merge, 
         StatCounter.mergeStats) 
        .mapValues(StatCounter.mean)) 
    
-1

ウィンドウコンセプトを使用すると、特定の列の値を平均化できます。次のコードを考えてみましょう:

import pyspark.sql.functions as F 
from pyspark.sql import Window 
df = spark.createDataFrame([('a', 2), ('b', 3), ('a', 6), ('b', 5)], 
          ['a', 'i']) 
win = Window.partitionBy('a') 
df.withColumn('avg', F.avg('i').over(win)).show() 

がもたらすであろう:

+---+---+---+ 
| a| i|avg| 
+---+---+---+ 
| b| 3|4.0| 
| b| 5|4.0| 
| a| 2|4.0| 
| a| 6|4.0| 
+---+---+---+ 

を平均凝集が個別に各ワーカーで行われ、ホストへのラウンドトリップを必要とせず、したがって、効率的。

+0

ありがとうございましたが、この質問に既に投稿されているすべてのものが明確になるので、私は特に 'combineByKey' - >' map'オペレーションがエラーを起こした理由について質問していました。 –

関連する問題