からの連続インデックスの値を読んでください、私は、Sparkスカラ座での問題を抱えているシリーズのキーから最初の値を取得し、私はこのような新しいRDDを作成します。はどのように私は火花RDD
[(a,1),(a,2),(a,3),(a,4),(b,1),(b,2),(a,3),(a,4),(a,5),(b,8),(b,9)]
私が取得したいですコメントで述べたように、私はRDD
からの連続インデックスの値を読んでください、私は、Sparkスカラ座での問題を抱えているシリーズのキーから最初の値を取得し、私はこのような新しいRDDを作成します。はどのように私は火花RDD
[(a,1),(a,2),(a,3),(a,4),(b,1),(b,2),(a,3),(a,4),(a,5),(b,8),(b,9)]
私が取得したいですコメントで述べたように、私はRDD
からのScalaでこれを行うことができ、RDDの要素の順序を使用することができるようにするために、あなたが持っていると思いますどのように
[(a,1),(b,1),(a,3),(b,8)]
:このような結果何とかこれを表すoデータそのもののrder。そのためには、正確にzipWithIndex
が作成されました。インデックスがデータに追加されました。その後、いくつかの操作(修正指標とRDDのjoin
)で我々はあなたが必要なものを得ることができます。
// add index to RDD:
val withIndex = rdd.zipWithIndex().map(_.swap)
// create another RDD with indices increased by one, to later join each element with the previous one
val previous = withIndex.map { case (index, v) => (index + 1, v) }
// join RDDs, filter out those where previous "key" is identical
val result = withIndex.leftOuterJoin(previous).collect {
case (i, (left, None)) => (i, left) // keep first element in RDD
case (i, (left, Some((key, _)))) if left._1 != key => (i, left) // keep only elements where the previous key is different
}.sortByKey().values // if you want to preserve the original order...
result.collect().foreach(println)
// (a,1)
// (b,1)
// (a,3)
// (b,8)
Tzach Zoharに感謝します。これは私が必要とする正しい答えです。ありがとうございました! – lee
喜んで助けてください;有益な答えを示す最善の方法は、それを(それの隣にある緑のVマークを使って)受け入れるか、それをupvoteすることです;) –
スパークは、あなたのコレクションの順序を維持しないので、それはあなたがあなたの既存のデータ構造を行うことができない何か。データをソートしてウィンドウ関数を使用するためのキーを導入する必要があります:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html – jamborta
あなたは何をしようとしているのですか?あなたが取ってくるシリーズを説明できますか? –
@SumeetSharma彼は、同じキーを持つ連続したキューブの各グループの最初のカップルだけを保持したいと思うが、RDDはその順序を保持しないので、グループは最初のシーケンスと異なるだろう – Matt