2016-12-12 9 views
3

私は1,300万行と800個の列を持つPySparkデータフレームを持っています。このデータを正規化する必要があります。このコードは、小さな開発データセットで動作します。大規模なPySparkデータフレームを正規化すると、CodeGenが64 KBを超えてエラーになります

 at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893 
) 
     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950) 
     at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947) 
     at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) 
     at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) 
     ... 44 more 
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache. 
spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB 
     at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941) 
     at org.codehaus.janino.CodeContext.write(CodeContext.java:836) 
     at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251) 
     at org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:8933) 
     at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4346) 
     at org.codehaus.janino.UnitCompiler.access$7100(UnitCompiler.java:185) 
     at org.codehaus.janino.UnitCompiler$10.visitBooleanLiteral(UnitCompiler.java:3267) 

が似て見えるの周りにいくつかのSpark JIRA issuesありますが、これらはすべて解決マークされます。完全なデータセットを使用している場合

def z_score_w(col, w): 
    avg_ = avg(col).over(w) 
    stddev_ = stddev_pop(col).over(w) 
    return (col - avg_)/stddev_ 

w = Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)  
norm_exprs = [z_score_w(signalsDF[x], w).alias(x) for x in signalsDF.columns] 

normDF = signalsDF.select(norm_exprs) 

はしかし、私は、コード生成と例外に遭遇します。関連性のあるthis SO questionもありますが、答えは代替技術です。

データフレームの列のバッチを正規化する独自の回避策があります。これはうまくいきますが、複数のデータフレームで結ばれてしまいます。これは遅くなります。

私の質問には、大きなデータフレームを正規化する代替技術がありますか?

私はspark-2.0.1を使用しています。

答えて

2

明白な問題の1つは、ウィンドウ関数を使用する方法です。次のフレーム:

Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)  

実際には役に立たないです。パーティション列がなければ、最初にすべてのデータを単一のパーティションに再シャッフルします。このスケーリングの方法は、スケーリングをグループ単位で実行する場合にのみ有効です。残念ながら、両方の入力としてVectorデータを必要

  • pyspark.ml.feature.StandardScaler
  • pyspark.mllib.feature.StandardScaler

スパーク機能を拡張するために使用することができる2つのクラスを提供します。 ML

from pyspark.ml.feature import StandardScaler as MLScaler, VectorAssembler 
from pyspark.ml import Pipeline 

scaled = Pipeline(stages=[ 
    VectorAssembler(inputCols=df.columns, outputCol="features"), 
    MLScaler(withMean=True, inputCol="features", outputCol="scaled") 
]).fit(df).transform(df).select("scaled") 

これは、元の形が必要な場合はさらにscaled列を拡張する必要があります。 MLlibで

:列の数に関するコード生成の問題がある場合

from pyspark.mllib.feature import StandardScaler as MLLibScaler 
from pyspark.mllib.linalg import DenseVector 

rdd = df.rdd.map(DenseVector) 
scaler = MLLibScaler(withMean=True, withStd=True) 

scaler.fit(rdd).transform(rdd).map(lambda v: v.array.tolist()).toDF(df.columns) 

後者の方法がより有用であることができます。あなたはグローバルな統計

from pyspark.sql.functions import avg, col, stddev_pop, struct 

stats = df.agg(*[struct(avg(c), stddev_pop(c)) for c in df.columns]).first() 

を計算して選択するには、この問題にアプローチすることができます

別の方法:

df.select(*[ 
    ((col(c) - mean)/std).alias(c) 
    for (c, (mean, std)) in zip(df.columns, stats) 
]) 

あなたのコメントに続いて、あなたが考えることができる最も簡単な解決策は、numpyの、いくつかの基本的なを使用して表現することができます変換:

rdd = df.rdd.map(np.array) # Convert to RDD of NumPy vectors 
stats = rdd.stats() # Compute mean and std 
scaled = rdd.map(lambda v: (v - stats.mean())/stats.stdev()) # Normalize 

そしてDataFrame

scaled.map(lambda x: x.tolist()).toDF(df.columns) 
関連する問題