2017-02-09 8 views
0

Sparkでローカル外れ値ファクタを実装しようとしています。だから私はファイルから読み込んだポイントのセットを持っていて、各ポイントに対してN個の最も近いネイバーを見つける。各点は、私が2 RDDS を持っている今SparkでScalaを使用して2つのRDDに参加

のでzipWithIndexを使用して、それに与えられたインデックス()コマンドを持ってまず

RDD[(Index:Long, Array[(NeighborIndex:Long, Distance:Double)])] 
ロングはそのインデックスを表し、配列はロングとそのNの最近傍から構成さ

第二に、与えられた点から自分の距離を表すこれらの隣人のインデックス位置を表すとダブル

RDD[(Index:Long,LocalReachabilityDensity:Double)]

私が欲しいもの。ここ

は、ロング再び与えられた点のインデックスを表し、そしてダブル

そのローカル到達可能性の密度を表し、すべてのポイントが含まれているRDD、及びそのNの最も近い隣人の配列とそのですだから、基本的には、ここでローカル到達可能性密度

RDD[(Index:Long, Array[(NeighborIndex:Long,LocalReachabilityDensity:Double)])] 

は、ロングポイントのインデックスを表すことになり、かつ配列はそのインデックス値とローカル到達可能性の密度で、そのN最も近い隣人であろう。

私の理解によれば、最初のRDDでマップを実行し、その配列内の値をローカル到達可能性密度を含む2番目のRDDと結合し、与えられたすべてのインデックスのローカル到達可能密度を取得する必要があります。そのN人の隣人。しかし、私はこれを達成する方法がわかりません。いずれかが私を助けることができれば、それは与えられた偉大な

+0

最初にデカルトと結合し、最初にデカルトを結合し、遠方にフィルタリングします – BlackBear

答えて

1

次のようになります。

val rdd1: RDD[(index: Long, Array[(neighborIndex: Long, distance: Double)])] = ... 
val rdd2: RDD[(index: Long, localReachabilityDensity: Double)] = ... 

を私は本当にすべてでScalaのArrayを使用して好きではありません。私はまたあなたの抽象概念が交差目的であることを好まない。換言すれば、のrdd2は、rdd1の様々なエントリに埋め込まれています。これにより、物事を推論するのが難しくなります。また、最初のRDDに変換する際に2番目のRDDにアクセスできないSpark RDD APIの制限が発生します。私はあなたの現在の仕事を書き直して、より簡単な抽象を作り出すべきだと考えています。

しかし、あなたがしなければならない場合:

val flipped = rdd1.map { 
    case (index, array) => 
    array.map { 
     case (neighborIndex, distance) => (neighborIndex, (index, distance)) 
    }.elements.toVector 
}.flatMap(identity) 
.groupBy(_._1) 
val result = flipped.join(rdd2).mapValues { 
    case (indexDistances, localReachabilityDensity) => 
     indexDistances.map { 
     case (index, _) => (index, localReachabilityDensity) 
     }  
} 

基本的な考え方は、その後、私はとjoinを行うことができますPairRDDのキーとしてトップレベルにneighborIndex値を「抽出」しrdd1を反転することですrdd2ArrayVectorに置き換えてください。同じインデックス上で結合した後は、結合するほうがはるかに簡単です。

これは私の頭の上から外れていて、完璧ではないかもしれないことに注意してください。このアイデアは、コピーペーストのソリューションを提供するのではなく、むしろ異なる方向性を示唆しています。

+0

.headでエラーが発生しました value headはorg.apache.spark.rddのメンバーではありません。RDD [(Long、Double)] 私が理解しているように、頭を使ってフィルタ結果の最初の要素を取得し、次に2番目の要素(ローカル到達可能性密度)を取得しています。 代わりにfirst()を使用しようとしましたが、変換またはアクションを変換内で呼び出すことができないというエラーが発生しました... どのような解決策がありますか? –

+0

私はすべての値を格納するために頭を取り除くことにしました。そして、私は変換の中で変換を行うことができないということに間違いがありました。このコードにはmapValues内にエラーがあります 'org.apache.spark.SparkException:RDD変換とアクションはドライバによってのみ呼び出され、他の変換の内部では呼び出されません ' –

+0

私が表示するように' lookup'を試してみてください上記。 – Vidya

関連する問題