https://spark.apache.org/docs/latest/programming-guide.html(「関数をスパークする」を参照してください)、私のケースでは、ケースクラスで型付きデータセットを使用しています。私はマッピングメソッドを保持するためにシングルトンオブジェクトを使用しようとしています。ステージのパフォーマンスを最適化するために必要な機能をパッケージ化する方法(データセットをあるタイプから別のタイプに変換し、寄木細工に書き込む方法)をどのように最適化するのがよいか疑問に思っています。データセット、大きなJavaクラス、およびシングルトンを使用する場合のスパークパス関数
現在、ステージ段階では、約3百万行(約1.5時間)の非常に長い時間がかかっています。約880 MBのデータがs3の寄木細工に出力されます。
min executors = 3、max executors = 10、各実行プログラムの4つのコア、ドライバメモリ8GBを使用してクラスタモードで実行しています。
-
ハイレベル符号化部:私は別のケースクラスC2に1ケースクラスC1をマッピングしています
。 C1およびC2には、java.sql.Timestamp、Option [String] Option [Int]、String、Int、BigIntなど、さまざまなタイプの約16個のフィールドがあります。
case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)
C1からC2にマッピングするために、私はhttps://github.com/drtimcooper/LatLongToTimezoneからコピー非常に大きなJavaクラス Jの(静的メソッド)機能を必要とします。
public class J {
public static String getValue((float) v) = ...
}
私は、マッピング関数によって呼び出された有用なクラスUtilの中にマッピング関数を作成しました。
=========
は基本的に私のコードの流れは次のようになります。
case class C1(field1 : _, field2 : _, field3 : _, ...)
case class C2(field1 : _, field2 : _, field3 : _, ...)
// very large java class J that only contains static methods
public class J {
public static String getValue((float) v) = ...
...
}
object Util {
def m1(i: Int): Int = ...
def m2(l: Option[BigDecimal], l2: Option[BigDecimal]): Int = {
J.getValue(l.get, l2.get)
}
...
def convert_C1_to_C2(c1: C1): C2 = {
C2(
field1 = m1(c1.field1),
field2 = m2(c1.field2, c1.field3),
...
}
}
dataframe.as[C1].map(Util.convert_C1_to_C2)
.mode(SaveMode.Overwrite)
.parquet("s3a://s3Path")
はこれを書くためのより最適な方法はありますか?あるいは、私がこれをやったやり方で誰かが目障りなエラーを指摘できますか?私のコードを見て、私はなぜそれがタスクを完了するために長い時間をかけているか分からない。
私はすでにs3のファイル量を減らすために16のパーティションをまとめることを試みましたが、これはジョブの実行を大幅に遅くするようです。典型的には、合体することなく64個のパーティションが存在する。