1

以下のユースケースを解決するための最適化された、または最良のパフォーマンスのアプローチとはスパークデータフレームでの複数列計算の最適化方法?

私たちの関心が1列のメッセージである100万行と100列のデータフレームを考えてみましょう。私は、メッセージ内のキーワードの存在を照合する条件に基づいて3つの新しい列を構築する必要があります。

  • メッセージ:コードの開発に向けスタックオーバーフローの寄与は日によって 増加の日です
  • FLAG1キーワード:スタック、
  • FLAG2キーワード弛み:TwitterやFacebook、のWhatsApp
  • FLAG3キーワード:フロー、実行し、増加する

予想される出力:(メッセージ、フラグ1、フラグ2、フラグ3)コード開発に対するスタックオーバーフローの貢献度が日々増加しています。1,0,0

アプローチ1

val tempDF = df.withColumn("flag1",computeFlag(col("message"))).withColumn("flag2",computeFlag(col("message"))).withColumn("flag3",computeFlag(col("message"))) 

アプローチ2

val tempDF = df.withColumn("flagValues",computeMultipleFlags(col("message"))).withColumn("_tmp", split($"flagValues","#")).select($"message",$"_tmp".getItem(0).as("flag1"),$"_tmp".getItem(1).as("commercial"),$"_tmp".getItem(2).as("flag2"),$"_tmp".getItem(3).as("flag3")).drop("_tmp") 

UDF:computeFlag各キーワードリスト

UDFの正確な一致に基づいて、1または0を返す:へcomputeMultipleFlags flag1、flag2のそれぞれのキーワードの完全一致に基づいて、1または0の区切られた結果を返す。 dフラグ3:例1#0#0

私は両方のアプローチを使用して解決しましたが、パフォーマンスを向上させるにはアプローチ2を参照してください。ご意見をお聞かせください。

  • スパークデータフレームは、デフォルトでは並列化されているが、どのようなアプローチ1.上この場合 作品はFLAG1、FLAG2、FLAG3列は パラレルまたはシーケンシャルに計算されますでしょうか?列に対して複数行の複数のスレッド
    計算:

  • は自動的に並列プロセスの私の入力欄 「メッセージ」データフレームをスパークのでしょうか?どちらの場合も

答えて

0

あなたは、処理中に、貴重な時間を割いて、columnserializationdeserializationを必要とする、udf機能を使用していました。あなたがシリアライズとデシリアライズが一つだけudf機能を定義している2番目の場合、各フラグ生成

ための3つの時間を発生したことを意味し、withColumnを使用して、同じudf 3回呼ばれている、あなたの最初のケースで

。したがって、最初のものより速く効率的に実行することは明らかです。そしてあなたは良い記号であるsplitの機能を使用しました。

Dataframe Sは各executorsで自然にので、各関数呼び出しが実行され得る平行に分布しているが、各機能を順次すなわちデータを並列化が、機能/タスクがまだシーケンシャルあり得る実行されます。

私は説明がudf場合のように、追加のシリアライズとデシリアライズを必要としないSpark functionsを使用して、あなたのケースのためのよりよい解決策があり

明らかであると思います。次のソリューションを使用できます。

df.withColumn("_tmp", split($"message", " ")) 
    .select($"message", 
    when(array_contains($"_tmp", "stack") || array_contains($"_tmp", "slack"), "1").otherwise("0") as "flag1", 
    when(array_contains($"_tmp", "twitter") || array_contains($"_tmp", "facebook") || array_contains($"_tmp", "whatsapp"), "1").otherwise("0") as "flag2", 
    when(array_contains($"_tmp", "flow") || array_contains($"_tmp", "run") || array_contains($"_tmp", "increase"), "1").otherwise("0") as "flag3") 
    .show(false) 
関連する問題