私はSparkSQL 2.2.0を使用してCassandraからデータをロードし、Elasticsearchにインデックスを付けます。私が持っているデータは、顧客(最初のテーブルpeople
)と注文(2番目のテーブルorders
)で構成されています。
テーブルオーダーには、対応するカスタマーを指す列person_id
があります。
people
テーブルとorders
を照会(そしてElasticsearchで後で索引付け)する必要があるため、購入したオーダーの数を各顧客に与えることができます。
私が考え出した最も簡単な方法は、2つのテーブルをorg.apache.spark.sql.Dataset<Row>
に読み込み、をperson_id
列のに参加させることです。その後、私はgroupBy(person_id)
です。
これは私にperson_id
とcount
という2つの列を持つデータセットを提供します。私はpeople
テーブルに戻ってくる必要がありますので、他の人のデータでカウントすることができます。SparkSQLが親子データセットを結合する
Dataset<Row> peopleWithOrders = people.join(orders, people.col("id").equalTo(orders.col("person_id")), "left_outer");
Dataset<Row> peopleOrdersCounts = peopleWithOrders.groupBy("id").count().withColumnRenamed("id", "personId");
Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer")
.withColumnRenamed("count", "nbrOfOrders")
.select("id", "name", "birthDate", "nbrOfOrders");
people
テーブルは1_000_000行とorders
1 2_500_000を有しています。各顧客には2つまたは3つの注文があります。
私は2,2 GBのIntel Core i7プロセッサと16 GBの1600 MHz DDR3メモリを搭載したMAC Book Proを使用しています。すべてのCassandra、Spark 2.2マスターと(単一の)ワーカーは同じマシン上にあります。
これらの3つの結合には15〜20秒かかります。
私の質問です:パフォーマンスの向上の余地はありますか? ウィンドウ集計関数には、ログにShuffleMapTaskが表示されるため、メリットがあります。
ありがとうございます。
はい、そうです。私の悪い。しかしそれはまだ「比較的遅い」(ab 16s)です。私は "Window Aggregate Functions"が助けになるか、これを行う通常の方法であるかどうか疑問に思っていた。 –
私が知る限り、これを行う方法です。特に、「グループバイ」の場合。あなたはUser Defined Aggregate Functions(UDAF)を見ることができますが、それらは特定のケースでもあります。これを遅くしている他の操作はありますか? – Nikhil