2017-02-06 27 views
0

私はPython 2.7.5でSpark 1.5.2を使用しています。集計データフレームの操作後にPysparkフリーズ

私はpysparkのREPLで実行このコードを持っている:

from pyspark.sql import SQLContext 
ctx = SQLContext(sc) 

df = ctx.createDataFrame([("a",1),("a",1),("a",0),("a",0),("b",1),("b",0),("b",1)],["group","conversion"]) 

from pyspark.sql.functions import col, count, avg 
funs = [(count,"total"),(avg,"cr")] 
aggregate = ["conversion"] 
exprs = [f(col(c)).alias(name) for f,name in funs for c in aggregate] 
df3 = df.groupBy("group").agg(*exprs).cache() 

これまでのコードが正常に動作し、私はdf3を確認することができます。

>>> df3.collect() 
[Row(group=u'a', total=4, cr=0.5), Row(group=u'b', total=3, cr=0.6666666666666666)] 

しかし、私がしようとすると:

df3.agg(sum(col('cr'))).first()[0] 

PySparkはその合計を計算できません。しかし、df3.rdd.reduce(lambda x,y: x[2]+y[2])はうまく動作します。

したがって、合計を計算する最初のコマンドの問題は何ですか?

答えて

1

まず、pysparkのsum関数をインポートする必要があります。from pyspark.sql.functions import sum。そうでなければ、Pythonの組み込みのsumが呼び出され、数字のシーケンスを合計するだけです。

+0

実際、 'sum'をインポートするときに動作します。ありがとうございます。しかし、私が合計しようとしていたデータフレームには2行しかありませんでしたが、私はpysparkをローカルに(分散モードではなく)実行していました。それでなぜ内蔵の 'sum'関数が遅いのか知っていますか? – BillyBoy

+0

組み込み関数 'sum'は機能しません。 Sparkはおそらく集計段階を再試行していました(毎回失敗しています)。これは長時間続く可能性があります。 – Mariusz

関連する問題