0

私はSparkSQL 2.2.0を使用してCassandraからデータをロードし、Elasticsearchにインデックスを付けます。私が持っているデータは、顧客(最初のテーブルpeople)と注文(2番目のテーブルorders)で構成されています。
テーブルオーダーには、対応するカスタマーを指す列person_idがあります。
peopleテーブルとordersを照会(そしてElasticsearchで後で索引付け)する必要があるため、購入したオーダーの数を各顧客に与えることができます。
私が考え出した最も簡単な方法は、2つのテーブルをorg.apache.spark.sql.Dataset<Row>に読み込み、person_id列のに参加させることです。その後、私はgroupBy(person_id)です。
これは私にperson_idcountという2つの列を持つデータセットを提供します。私はpeopleテーブルに戻ってくる必要がありますので、他の人のデータでカウントすることができます。SparkSQLが親子データセットを結合する

Dataset<Row> peopleWithOrders = people.join(orders, people.col("id").equalTo(orders.col("person_id")), "left_outer"); 

Dataset<Row> peopleOrdersCounts = peopleWithOrders.groupBy("id").count().withColumnRenamed("id", "personId"); 

Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer") 
      .withColumnRenamed("count", "nbrOfOrders") 
      .select("id", "name", "birthDate", "nbrOfOrders"); 

peopleテーブルは1_000_000行とorders 1 2_500_000を有しています。各顧客には2つまたは3つの注文があります。
私は2,2 GBのIntel Core i7プロセッサと16 GBの1600 MHz DDR3メモリを搭載したMAC Book Proを使用しています。すべてのCassandra、Spark 2.2マスターと(単一の)ワーカーは同じマシン上にあります。
これらの3つの結合には15〜20秒かかります。
私の質問です:パフォーマンスの向上の余地はありますか? ウィンドウ集計関数には、ログにShuffleMapTaskが表示されるため、メリットがあります。

ありがとうございます。

答えて

0

私は最初の手順は不要だと思います。

Dataset<Row> peopleOrdersCounts = orders.groupBy("person_id").count(); 

Dataset<Row> peopleWithOrderCounts = people.join(personsOrdersCounts, people.col("id").equalTo(peopleOrdersCounts.col("personId")), "left_outer") 
      .withColumnRenamed("count", "nbrOfOrders") 
      .select("id", "name", "birthDate", "nbrOfOrders"); 

私はこれが役立つことを望みます。

+0

はい、そうです。私の悪い。しかしそれはまだ「比較的遅い」(ab 16s)です。私は "Window Aggregate Functions"が助けになるか、これを行う通常の方法であるかどうか疑問に思っていた。 –

+0

私が知る限り、これを行う方法です。特に、「グループバイ」の場合。あなたはUser Defined Aggregate Functions(UDAF)を見ることができますが、それらは特定のケースでもあります。これを遅くしている他の操作はありますか? – Nikhil

関連する問題