異なるパーティショナーのセットで2つのrddがあります。Apache Spark:異なるパーティショナーと2つのRDDに参加する
case class Person(name: String, age: Int, school: String)
case class School(name: String, address: String)
rdd1
私は人のage
に基づいて分配し、その後、school
にキーを変換したPerson
のRDD、です。
val rdd1: RDD[Person] = rdd1.keyBy(person => (person.age, person))
.partitionBy(new HashPartitioner(10))
.mapPartitions(persons =>
persons.map{case(age,person) =>
(person.school, person)
})
rdd2
学校のname
でグループ化されたSchool
のRDDです。
val rdd2: RDD[School] = rdd2.groupBy(_.name)
さて、rdd1
は、人間の年齢に基づいてパーティション化されているので、同じ年齢のすべての人が同じパーティションになります。そして、rdd2
は学校の名前に基づいて(デフォルトで)分割されています。
rdd1.leftOuterJoin(rdd2)
には、rdd1がrdd2に比べて非常に大きいので、rdd1
はシャッフルされません。また、私は結果をage
に分割されたCassandraに出力します。したがって、現在のパーティショニングrdd1
は、後で書き込むプロセスを固定します。 rdd2
が使用可能なメモリよりも大きいので、 1.シャッフルrdd1
と 2放送「RDD2」:
ずにそこに2 RDDSに参加する方法はあります。
注:結合されたrddは、年齢に基づいて分割する必要があります。
「leftOuterJoin [W](その他:RDD [(K、W)]、パーティショナー:パーティショナー)」という署名を使用し、rdd1と同じパーティショナーを使用すると便利です。 –
どちらも異なるキーで 'HashPartitioner'です。 'HashPartitioner'でカスタムキーをどのように指定しますか?それは、入力として複数のパーティションしか受け付けません。 – shashwat
注:rdd1〜100GBのサイズ、およびrdd2〜10GBのサイズ。 私はこのようなrdd2を15個持っています。これらはrdd1と結合する必要があります。そして、そのような小さなrdd(ここではrdd2)は、rdd1の異なるキーで結合されます。 rdd1のシャッフルを避けるために、固定キーに基づいてパーティションを分割してシャッフルしないようにしました。 – shashwat