0

私はSparkプログラムを開発しています。これは、各ユーザーが比較的大きなデータフレーム(〜137.5M行)になる確率を計算しています。私がする必要があるのは、これらのユーザーの上位10%(10人は任意であり、もちろん変更可能です)を取得してファイルに保存することです。比較的大きいSparkデータフレームの最高のパーセンテージをファイルに保存する方法

最小限の例では、次のようになります。

  • このデータフレームを考える:hc.sparkContext.parallelize(Array(("uid1", "0.5"), ("uid2", "0.7"), ("uid3", "0.3"))).toDF("uuid", "prob")
  • そして、私は出力が( "UID2"、 "0.7")にしたい0.3
  • のしきい値を与えられ、保存を"uid2"が最も高い確率を持ち、データフレームからユーザーの上位0.3%を取る必要があるだけなので、ファイル "output"に変換してください。

そして私の質問は次のとおりです。比較的大きなデータフレームでこれを行うには?

入力データフレームの10%のユーザー数を計算し、そのサイズでtopを使用できます。トップを使用している場合

  1. - データは、まず各エグゼキュータから上位10%を取ることによって、シャッフル前に還元し、次いでシャッフルデータのうち、10%を取るされている。しかし、私はこれについて、2つの懸念を持っていますか?そうでない場合は、私が提案したことを行うための組み込みの方法がありますか?それとも自分で実装すればいいのですか?
  2. Topはまだかなり大きい配列を返します。私はそれをデータフレームとして保持し、その出力を保存するだけです(シャッフル後にデータを再分割することもできます)。これを配列に変換してから並列化することなくこれを行う方法はありますか?

を必要に応じて、私は現在、あなたは窓関数percent_rank()を使用することができますスパークに事前に1.6.1

おかげ

+0

はい、それは、ウィンドウ関数を使用して、おそらく可能ですが、例えば、データセットと期待される出力とのご質問を説明してください。 – mtoto

+0

@mtoto私は達成しようとしているものの簡略化した例を追加しました。あなたは窓関数について詳しく説明できますか? – Gideon

+0

[こちら](http://stackoverflow.com/a/40048439/4964651)を参照してください。ただし、グループごとにトップロー*を探していない場合は、操作を並列化することはできません。 – mtoto

答えて

1

を使用していRDDSの代わりに、データフレームを使用して、心をいけません。ただし、グループ別にランク付けしていないため、partitionBy()を使用して操作を並列化することはできません。ここでは例がpySparkにあります:

from pyspark.sql.window import Window 
from pyspark.sql.functions import percent_rank, col 

window = Window.partitionBy().orderBy(df['prob'].desc()) 

df.select('*', percent_rank().over(window).alias('rank')) 
    .filter(col('rank') <= 0.3) # top 30% for example 
    .show() 
+----+----+----+ 
|uuid|prob|rank| 
+----+----+----+ 
|uid2| 0.7| 0.0| 
+----+----+----+ 

データ:

df = sc.parallelize([("uid1", "0.5"), 
        ("uid2", "0.7"), 
        ("uid3", "0.3")]).toDF(["uuid", "prob"])