13種類のETL操作を持つシステムでは、Spala 2.xとScalaを併用しています。それらのうち7つは比較的単純で、それぞれが単一のドメインクラスによって駆動され、主にこのクラスと負荷の処理方法のニュアンスが異なります。次のようにこの実施例の目的は、ロードされる7つのピザのトッピングがあることを言うためSpark/Scala、Datasetsおよびケースクラスを使用する多態性
負荷クラスの簡略化されたバージョンは、であり、ここでペパロニだ:すなわち
object LoadPepperoni {
def apply(inputFile: Dataset[Row],
historicalData: Dataset[Pepperoni],
mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row =>
PepperoniRaw(
weight = row.getAs[String]("weight"),
cost = row.getAs[String]("cost")
)
}.toDS()
val validatedData: Dataset[PepperoniRaw] = ??? // validate the data
val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data
val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw =>
Pepperoni(value = ???, key1 = ???, key2 = ???)
}.toDS()
val joinedData = dedupedData.joinWith(historicalData,
historicalData.col("key1") === dedupedData.col("key1") &&
historicalData.col("key2") === dedupedData.col("key2"),
"right_outer"
)
joinedData.map { case (hist, delta) =>
if(/* some condition */) {
hist.copy(value = /* some transformation */)
}
}.flatMap(list => list).toDS()
}
}
クラスは、一連の実行します操作はほとんど同じで、常に同じ順序ですが、「生」から「ドメイン」へのマッピングやマージ機能のように、トッピングごとにわずかに異なる場合があります。
7つのトッピング(マッシュルーム、チーズなど)でこれを行うには、構造とロジックがすべての負荷に共通であるため、単にクラスをコピー/ペーストしてすべての名前を変更しないでください。持っている、
object Load {
def apply[R,D](inputFile: Dataset[Row],
historicalData: Dataset[D],
mergeFun: (D, R) => D): Dataset[D] = {
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row =>
...
そして、このような「ドメイン」と「生」、またはマージからのマッピングとして、各クラス固有の動作用:代わりに、私はむしろ、このように、ジェネリック型とジェネリック「負荷」クラスを定義したいです特性を実装する抽象クラスなどがあります。これは典型的な依存性注入/多型パターンである。
しかし、私はいくつかの問題を抱えています。 Spark 2.xでは、エンコーダはネイティブタイプおよびケースクラスに対してのみ提供され、クラスをケースクラスとして一般的に識別する方法はありません。したがって、推論されたtoDS()やその他の暗黙的な機能は、ジェネリック型を使用しているときは利用できません。
また、this related question of mineに記載されているように、ジェネリックスを使用する場合にはケースクラスcopy
メソッドは使用できません。
私はScalaやHaskellのような型クラスやアドホック多型などの他のデザインパターンを検討しましたが、Spark Datasetは基本的には抽象的に定義できないケースクラスのみで動作します。
これはSparkシステムの一般的な問題ですが、解決策を見つけることができません。どんな助けもありがたい。
私はそれを試してみましょう心のブロアーのようなものですありがとうございました。私は同様のエラーaggregateByKey()と同様にコピー()(私のポストの関連する質問のリンクを参照してください)、それの範囲に必要な実装をもたらすことができる同様の魔法はありますか? –
より一般的には、欠落している特定の暗黙的なものを追跡し、そのタイプが完全に指定されているコールサイトまでパラメータにすることです。 aggregateByKeyの場合は、ClassTag [D](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions )。私はあなたの質問を個別に 'コピー'について見て、良いアイデアがあれば返信します –