2017-10-25 15 views

答えて

3
  • パラメータで渡された重みに基づいて既存のRDDを除算し、RDDの配列を返します。RDD.randomSplitを使用できます。

内部作業は以下のようになります...

/** 
* Randomly splits this RDD with the provided weights. 
* 
* @param weights weights for splits, will be normalized if they don't sum to 1 
* @param seed random seed 
* 
* @return split RDDs in an array 
*/ 
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = { 
    require(weights.forall(_ >= 0), 
    s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}") 
    require(weights.sum > 0, 
    s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}") 

    withScope { 
    val sum = weights.sum 
    val normalizedCumWeights = weights.map(_/sum).scanLeft(0.0d)(_ + _) 
    normalizedCumWeights.sliding(2).map { x => 
    randomSampleWithRange(x(0), x(1), seed) 
    }.toArray 
} 

注:彼らは1

に合計していない場合は分割の重み重みは、正規化されます上記の動作に基づいて、以下のようなサンプルスニペットを作成しました:

def getDoubleWeights(numparts:Int) : Array[Double] = { 
    Array.fill[Double](numparts)(1.0d) 
} 

発信者は次のようになるであろう....

val rddWithNumParts : Array[RDD] = yourRDD.randomSplit(getDoubleWeights(yourRDD.partitions.length)) 

この均一RDD

メモの数に分割する:同じことが以下DataFrame.randomSplit

に適用され
  • また、Dataframeにそれを変換するには、 RDD
    sqlContext.createDataFrame(rddOfRow, Schema)

後であなたは、このメソッドを呼び出すことができます。..たとえば以下のように使用します。

DataFrame [] randomSplit(double [] weights)指定された重みで DataFrameをランダムに分割します。あなたは(RDDに変換することができます)Iteratorを持っている各パーティションの

  • 私はパーティションの数に基づいて除されていた他の思考...

すなわちRDD.mapPartitionWithIndex(....)

に。あなたはパーティションの数= RDDの数のような何かを持つことができます

+0

しかし、それは常に2つのデータを右に分けるのですか? – AkhilaV

+0

重みの合計が1より大きい場合、正規化の一部として等しく分割されます。 –

+0

合計が1より大きい場合は重みとして 'rdd.partitions.length'を使用し、自動的に正規化して同じ数の重みに分割します。私は同じことをした。それはうまくいくはずです –

関連する問題