2017-05-12 10 views
0

からの連続インデックスの値を読んでください、私は、Sparkスカラ座での問題を抱えているシリーズのキーから最初の値を取得し、私はこのような新しいRDDを作成します。はどのように私は火花RDD

[(a,1),(a,2),(a,3),(a,4),(b,1),(b,2),(a,3),(a,4),(a,5),(b,8),(b,9)] 

私が取得したいですコメントで述べたように、私はRDD

+0

スパークは、あなたのコレクションの順序を維持しないので、それはあなたがあなたの既存のデータ構造を行うことができない何か。データをソートしてウィンドウ関数を使用するためのキーを導入する必要があります:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html – jamborta

+0

あなたは何をしようとしているのですか?あなたが取ってくるシリーズを説明できますか? –

+0

@SumeetSharma彼は、同じキーを持つ連続したキューブの各グループの最初のカップルだけを保持したいと思うが、RDDはその順序を保持しないので、グループは最初のシーケンスと異なるだろう – Matt

答えて

1

からのScalaでこれを行うことができ、RDDの要素の順序を使用することができるようにするために、あなたが持っていると思いますどのように

[(a,1),(b,1),(a,3),(b,8)] 

:このような結果何とかこれを表すoデータそのもののrder。そのためには、正確にzipWithIndexが作成されました。インデックスがデータに追加されました。その後、いくつかの操作(修正指標とRDDのjoin)で我々はあなたが必要なものを得ることができます。

// add index to RDD: 
val withIndex = rdd.zipWithIndex().map(_.swap) 

// create another RDD with indices increased by one, to later join each element with the previous one 
val previous = withIndex.map { case (index, v) => (index + 1, v) } 

// join RDDs, filter out those where previous "key" is identical 
val result = withIndex.leftOuterJoin(previous).collect { 
    case (i, (left, None)) => (i, left) // keep first element in RDD 
    case (i, (left, Some((key, _)))) if left._1 != key => (i, left) // keep only elements where the previous key is different 
}.sortByKey().values // if you want to preserve the original order... 

result.collect().foreach(println) 
// (a,1) 
// (b,1) 
// (a,3) 
// (b,8) 
+0

Tzach Zoharに感謝します。これは私が必要とする正しい答えです。ありがとうございました! – lee

+0

喜んで助けてください;有益な答えを示す最善の方法は、それを(それの隣にある緑のVマークを使って)受け入れるか、それをupvoteすることです;) –

関連する問題