2017-06-13 9 views
0

ソートされたRDD [String]からいくつかの要素を抽出しようとしています。私は "zipWithIndex"を試して、残りの部分でRDDをゼロにすることを試みました。ソートされたRDDをn個の部分に分割し、各部分から最初の要素を取得する方法は?

val expectedSize = 165 
val n = rddOfStrings.count/expectedSize 

val resultArray = rddOfStrings.sortBy(x => x).zipWithIndex.filter(x => x._2 % n == 0).map(_._1).collect 

"n"は必ずしも整数ではありません。 double型の場合、resultArrayのサイズはexpectedSizeと等しくなりません(+1または-1を生成します)。同じサイズのコレクションを返すにはどうすればいいですか?

P.S.私はすべてのエグゼキュータにコレクションオブジェクトを渡して、スパークアキュムレータを試しました。非常に大きなデータセットのため、失敗しました。 165部を等しくすることはできません

+0

は何ですかあなたの目標はここですか?データセットからサンプルを取得するだけです。実際に使用できるサンプルメソッドがあります。 – puhlen

+0

私は、rdd(ソート済み)を165等分し、それぞれの部分から最初の(頭の)要素を取ります。 – sen

答えて

0

、本当に - そのうちのいくつかは、合計サイズが「できるだけ均等に分散し、」これらの部品を得るために165

の乗算ではないと仮定すると、他よりも大きくする必要があります、あなたはその後、containsを使用してRDDをフィルタリングし、あなたは後にしている要素のインデックスを取得するには、n非丸めを使用n, 2n, 3n, ...のストリームを作成し、そのストリーム内の各要素ラウンドすることができます

val expectedSize = 165 
val n: Double = rddOfStrings.count.toDouble/expectedSize 

val indices = Stream.iterate(0D)(x => x + n) 
    .map(math.round) 
    .take(expectedSize) 
    .toList 

val resultArray = rddOfStrings.sortBy(x => x) 
    .zipWithIndex 
    .filter(x => indices.contains(x._2)) 
    .map(_._1) 
    .collect 
+0

完璧!ちょうど私が望んでいたように... – sen

関連する問題