2016-03-29 6 views
1

特定のノードに接続されている間接ノードを探したいと思います。 私は以下のようなグラフの連結成分のクラスを使用してみました...Spark Graphxの特定のノードに接続されている間接ノードを見つける方法

graph.connectedComponents 

しかし、それは私が特定のノードのために必要なすべてのgraph..butのために与えています。

私も以下のようにしてみました。

graph.edges.filter(_.srcId == x).map(_.dstId) 

これは特定のノードの直接ノードを与えます。これはRDD操作のみを使用して再帰的に行う必要があります。 これについて助力してもらえますか?

答えて

1

はこのような何かを試してみてください:

graph.edges.filter(_.srcId == x).map(e => (e.dstId, null)).join(
    graph.collectNeighborIds(EdgeDirection.Either) 
).flatMap{t => t._2._2}.collect.toSet 

あなたはこれよりも深くに行きたい場合は、私はPregel APIのようなものを使用します。基本的には、ノード間でメッセージを繰り返し送信し、結果を集約することができます。

編集:プレゲルソリューション

私は最終的に自分自身で停止するように反復を得ました。 編集以下のとおりです。接続されたノードのすべてのVertexIdsの配列 - 私たちはタイプArray[Long]のメッセージを送信しようとしている

graph.vertices.collect 
res46: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array()), (8,Array()), (1,Array()), (9,Array()), (5,Array()), (6,Array()), (2,Array()), (3,Array()), (7,Array()))  

graph.edges.collect 
res47: Array[org.apache.spark.graphx.Edge[Double]] = Array(Edge(1,2,0.0), Edge(2,3,0.0), Edge(3,4,0.0), Edge(5,6,0.0), Edge(6,7,0.0), Edge(7,8,0.0), Edge(8,9,0.0), Edge(4,2,0.0), Edge(6,9,0.0), Edge(7,9,0.0)) 

:このグラフを考えます。メッセージはアップストリームに送信されます。dstは、のVertexIdを他のすべてのダウンストリームVertexIdsと一緒に送信します。アップストリームノードが接続をすでに知っている場合は、メッセージは送信されません。結局、すべてのノードは接続されているすべてのノードを認識し、それ以上メッセージは送信されません。

まず、vprogを定義します。ドキュメントによれば:

各頂点上で実行され、 インバウンドメッセージを受信し、新たな頂点値を計算するユーザー定義の頂点プログラム。最初の 反復では、すべての頂点で頂点プログラムが呼び出され、 デフォルトメッセージが渡されます。その後の反復では、頂点プログラムは で、メッセージを受信する頂点でのみ呼び出されます。

def vprog(id: VertexId, orig: Array[Long], newly: Array[Long]) : Array[Long] = { 
    (orig ++ newly).toSet.toArray 
} 

その後、我々は我々のsendMsg定義 - 編集:は が現在の反復でメッセージを受け取った頂点のうちエッジに適用されるユーザー指定の関数src & dst

を入れ替え

def sendMsg(trip: EdgeTriplet[Array[Long],Double]) : Iterator[(VertexId, Array[Long])] = { 
    if (trip.srcAttr.intersect(trip.dstAttr ++ Array(trip.dstId)).length != (trip.dstAttr ++ Array(trip.dstId)).toSet.size) { 
    Iterator((trip.srcId, (Array(trip.dstId) ++ trip.dstAttr).toSet.toArray)) 
    } else Iterator.empty } 

次の私たちのmergeMsg

この機能は 可換連想との理想的な大きさでなければならないタイプの2つの受信メッセージ を取り、型Aの単一のメッセージにそれらをマージし、ユーザー供給機能Aは 増加してはいけません。

def mergeMsg(a: Array[Long], b: Array[Long]) : Array[Long] = { 
    (a ++ b).toSet.toArray 
} 

はその後、我々はpregel実行 - 編集:

残念ながら、我々は上記の最後の文では、ルールを破るつもりInt.MaxValue

val result = graph.pregel(Array[Long]())(vprog, sendMsg, mergeMsg) 
maxIterations、デフォルトを削除

あなたは結果を見ることができます:

result.vertices.collect 
res48: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array(4, 2, 3)), (8,Array(8, 9)), (1,Array(1, 2, 3, 4)), (9,Array(9)), (5,Array(5, 6, 9, 7, 8)), (6,Array(6, 7, 9, 8)), (2,Array(2, 3, 4)), (3,Array(3, 4, 2)), (7,Array(7, 8, 9))) 
+0

それはまったく単純な問題ではありません。あなたが見てみるべき 'aggregateMessages' APIと' Pregel'というアルゴリズムがあります。基本的には、ノード間でメッセージを繰り返し送信し、結果を集約することができます。コメントで説明するのは難しいですが、アルゴリズムの最初の反復では、ノード7はノード2に(8,9)を送り、ノード8はノード7に(10,11)を送ります。 2番目の反復では、ノード7はノード2に(10,11)を送信します。したがって、2回のPregelの繰り返しがあります。 –

+0

確かにDavid ...私はそれで動作します...そしてあなたを更新します...ありがとう – Devndra

+0

だから、素晴らしい - 今受け入れ、私の答えをupvote! –

関連する問題