私は異なる列( 'features')を含むデータフレームを持っています。Spark - filter()の代わりにgroupBy()を使用して、データフレームの計算時間を最適化します。
私の目標は、列X統計的尺度を計算することです: 平均、スタンダール・偏差、分散
をしかし、列Y.への依存 例えばとともに、それらのすべてを計算しますY = 1のすべての行を取得し、mean、stddev、varを計算すると はY = 2のすべての行に対して同じ処理を行います。
私の現在の実装は次のようになります。
print "For CONGESTION_FLAG = 0:"
log_df.filter(log_df[flag_col] == 0).select([mean(size_col), stddev(size_col),
pow(stddev(size_col), 2)]).show(20, False)
print "For CONGESTION_FLAG = 1:"
log_df.filter(log_df[flag_col] == 1).select([mean(size_col), stddev(size_col),
pow(stddev(size_col), 2)]).show(20, False)
print "For CONGESTION_FLAG = 2:"
log_df.filter(log_df[flag_col] == 2).select([mean(size_col), stddev(size_col),
pow(stddev(size_col), 2)]).show(20, False)
私はより速く、それらの計算の実行を行うために(私は1GBのデータでこれを使用していることをfilter()
方法は、計算時間の面で無駄で語った、とアドバイスを受けました。ファイル)の場合は、groupBy()
メソッドを使用する方が良いでしょう。
誰かが、代わりにgroupByを使用して、同じ計算を行うためにそれらの行を変換するのを手伝ってもらえますか? シンタックスが混乱してしまい、正しく処理できませんでした。
ありがとうございました。
感謝を説明的な答え。 aggregated_dfを作成する前に 'log_df.cache()'を使用するとパフォーマンスが向上しますか? – Adiel
log_dfの生成方法と使用方法によって異なります。ファイルを作成するだけの場合(たとえば、ファイルから読み込む場合)、何のメリットもありません。あなたが他のもののためにそれを再使用し、十分なメモリを持っているならおそらくはい。 –
私はテキストファイル(1GB)を読んでいて、いくつかの列の値を操作していて、他の列のいくつかの条件でいくつかの行を削除してから計算に行きます。私はgroupBy \ aggregateメソッドによる計算の直前にdf.cache()を追加したい – Adiel