2016-05-05 18 views
1

異なるパーティショナーのセットで2つのrddがあります。Apache Spark:異なるパーティショナーと2つのRDDに参加する

case class Person(name: String, age: Int, school: String) 
case class School(name: String, address: String) 

rdd1私は人のageに基づいて分配し、その後、schoolにキーを変換したPersonのRDD、です。

val rdd1: RDD[Person] = rdd1.keyBy(person => (person.age, person)) 
          .partitionBy(new HashPartitioner(10)) 
          .mapPartitions(persons => 
           persons.map{case(age,person) => 
            (person.school, person) 
          }) 

rdd2学校のnameでグループ化されたSchoolのRDDです。

val rdd2: RDD[School] = rdd2.groupBy(_.name) 

さて、rdd1は、人間の年齢に基づいてパーティション化されているので、同じ年齢のすべての人が同じパーティションになります。そして、rdd2は学校の名前に基づいて(デフォルトで)分割されています。

rdd1.leftOuterJoin(rdd2)には、rdd1がrdd2に比べて非常に大きいので、rdd1はシャッフルされません。また、私は結果をageに分割されたCassandraに出力します。したがって、現在のパーティショニングrdd1は、後で書き込むプロセスを固定します。 rdd2が使用可能なメモリよりも大きいので、 1.シャッフルrdd1と 2放送「RDD2」:

ずにそこに2 RDDSに参加する方法はあります。

注:結合されたrddは、年齢に基づいて分割する必要があります。

+0

「leftOuterJoin [W](その他:RDD [(K、W)]、パーティショナー:パーティショナー)」という署名を使用し、rdd1と同じパーティショナーを使用すると便利です。 –

+0

どちらも異なるキーで 'HashPartitioner'です。 'HashPartitioner'でカスタムキーをどのように指定しますか?それは、入力として複数のパーティションしか受け付けません。 – shashwat

+0

注:rdd1〜100GBのサイズ、およびrdd2〜10GBのサイズ。 私はこのようなrdd2を15個持っています。これらはrdd1と結合する必要があります。そして、そのような小さなrdd(ここではrdd2)は、rdd1の異なるキーで結合されます。 rdd1のシャッフルを避けるために、固定キーに基づいてパーティションを分割してシャッフルしないようにしました。 – shashwat

答えて

1

rdd1とrdd2の2つのrddがあり、結合操作を適用したいとします。 rddsがパーティション化されている(パーティションが設定されている)場合rdd3 = rdd1.join(rdd2)を呼び出すと、rdd3でrdd3パーティションが作成されます。 rdd3は常にrdd1(最初の親、結合が呼び出された親)からハッシュ・パーティションを取る。

関連する問題