RDDに同じエグゼキュータと同じパーティショナーを割り当てて、ネットワークトラフィックを回避し、コグループや結合などのシャッフル操作にステージ境界がなく、すべての変換が1つのステージ。この我々でSparkで同じパーティショナーを割り当てる
public Seq<String> getPreferredLocations(Partition split){
listString.add("11.113.57.142");
listString.add("11.113.57.163");
listString.add("11.113.57.150");
List<String> finalList = new ArrayList<String>();
finalList.add(listString.get(split.index() % listString.size()));
Seq<String> toReturnListString = scala.collection.JavaConversions.asScalaBuffer(finalList).toSeq();
return toReturnListString;
}
:だから、この私たちのように(Scalaでは)RDD.classからオーバーライドされたgetPreferredLocation機能を持つJavaで私たちのカスタムRDDクラス(ExtendRDD.class)でRDDをラップ達成する
どのノードにRDDをクラスタに置くかに関してスパークの挙動を制御することができる。しかし、今問題は、これらのRDDのパーティション化者 が異なるため、スパークはそれらがシャッフルに依存しているとみなし、これらのシャッフル操作のために複数のステージを作成するということです。それはこれらのRDDSが同じパーティから来ることを考慮しなければならない
同じステージの下に置くための火花のためにpublic Option<Partitioner> partitioner() {
Option<Partitioner> optionPartitioner = new Some<Partitioner>(this.getPartitioner());
return optionPartitioner;
}
:私たちは、同じカスタムRDDの同じRDD.classのパーティショナメソッドをオーバーライドしてみました。 スパークは2つのRDDに対して異なるパーティショナーを取り、シャッフル操作のための複数のステージを作成するため、私たちのパーティショナーメソッドは動作していないようです。
私たちは、私たちのようにカスタムRDDとScalaのRDDを包んだ:私たちは、同様の方法で別のカスタムRDDを作成し、そのRDDのうち、PairRDD(pairRDD2)を取得
ClassTag<String> tag = scala.reflect.ClassTag$.MODULE$.apply(String.class);
RDD<String> distFile1 = jsc.textFile("SomePath/data.txt",1);
ExtendRDD<String> extendRDD = new ExtendRDD<String>(distFile1, tag);
。その後、我々はpartitionBy機能を使用してPairRDDFunctionオブジェクトにextendRDDオブジェクトと同じパーティショナを適用し、それにコグループを適用しよう:それはコグループの変革に遭遇したときに火花が複数のステージを作成するよう
RDD<Tuple2<String, String>> pairRDD = extendRDD.keyBy(new KeyByImpl());
PairRDDFunctions<String, String> pair = new PairRDDFunctions<String, String>(pairRDD, tag, tag, null);
pair.partitionBy(extendRDD2.getPartitioner());
pair.cogroup(pairRDD2);
このすべてが動作するようには思えません。
同じパーティショナーをRDDにどのように適用できますか?
ハッシュパーティショニングまたはレンジパーティショニングを使用しています –
HashPartitioning –