はこのような何かを試してみてください:
graph.edges.filter(_.srcId == x).map(e => (e.dstId, null)).join(
graph.collectNeighborIds(EdgeDirection.Either)
).flatMap{t => t._2._2}.collect.toSet
あなたはこれよりも深くに行きたい場合は、私はPregel APIのようなものを使用します。基本的には、ノード間でメッセージを繰り返し送信し、結果を集約することができます。
編集:プレゲルソリューション
私は最終的に自分自身で停止するように反復を得ました。 編集以下のとおりです。接続されたノードのすべてのVertexIds
の配列 - 私たちはタイプArray[Long]
のメッセージを送信しようとしている
graph.vertices.collect
res46: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array()), (8,Array()), (1,Array()), (9,Array()), (5,Array()), (6,Array()), (2,Array()), (3,Array()), (7,Array()))
graph.edges.collect
res47: Array[org.apache.spark.graphx.Edge[Double]] = Array(Edge(1,2,0.0), Edge(2,3,0.0), Edge(3,4,0.0), Edge(5,6,0.0), Edge(6,7,0.0), Edge(7,8,0.0), Edge(8,9,0.0), Edge(4,2,0.0), Edge(6,9,0.0), Edge(7,9,0.0))
:このグラフを考えます。メッセージはアップストリームに送信されます。dst
は、のVertexId
を他のすべてのダウンストリームVertexIds
と一緒に送信します。アップストリームノードが接続をすでに知っている場合は、メッセージは送信されません。結局、すべてのノードは接続されているすべてのノードを認識し、それ以上メッセージは送信されません。
まず、vprog
を定義します。ドキュメントによれば:
各頂点上で実行され、 インバウンドメッセージを受信し、新たな頂点値を計算するユーザー定義の頂点プログラム。最初の 反復では、すべての頂点で頂点プログラムが呼び出され、 デフォルトメッセージが渡されます。その後の反復では、頂点プログラムは で、メッセージを受信する頂点でのみ呼び出されます。
def vprog(id: VertexId, orig: Array[Long], newly: Array[Long]) : Array[Long] = {
(orig ++ newly).toSet.toArray
}
その後、我々は我々のsendMsg
定義 - 編集:は が現在の反復でメッセージを受け取った頂点のうちエッジに適用されるユーザー指定の関数src
& dst
を入れ替え
def sendMsg(trip: EdgeTriplet[Array[Long],Double]) : Iterator[(VertexId, Array[Long])] = {
if (trip.srcAttr.intersect(trip.dstAttr ++ Array(trip.dstId)).length != (trip.dstAttr ++ Array(trip.dstId)).toSet.size) {
Iterator((trip.srcId, (Array(trip.dstId) ++ trip.dstAttr).toSet.toArray))
} else Iterator.empty }
次の私たちのmergeMsg
:
この機能は 可換連想との理想的な大きさでなければならないタイプの2つの受信メッセージ を取り、型Aの単一のメッセージにそれらをマージし、ユーザー供給機能Aは 増加してはいけません。
def mergeMsg(a: Array[Long], b: Array[Long]) : Array[Long] = {
(a ++ b).toSet.toArray
}
はその後、我々はpregel
実行 - 編集:
残念ながら、我々は上記の最後の文では、ルールを破るつもりInt.MaxValue
val result = graph.pregel(Array[Long]())(vprog, sendMsg, mergeMsg)
に
maxIterations
、デフォルトを削除
あなたは結果を見ることができます:
result.vertices.collect
res48: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array(4, 2, 3)), (8,Array(8, 9)), (1,Array(1, 2, 3, 4)), (9,Array(9)), (5,Array(5, 6, 9, 7, 8)), (6,Array(6, 7, 9, 8)), (2,Array(2, 3, 4)), (3,Array(3, 4, 2)), (7,Array(7, 8, 9)))
それはまったく単純な問題ではありません。あなたが見てみるべき 'aggregateMessages' APIと' Pregel'というアルゴリズムがあります。基本的には、ノード間でメッセージを繰り返し送信し、結果を集約することができます。コメントで説明するのは難しいですが、アルゴリズムの最初の反復では、ノード7はノード2に(8,9)を送り、ノード8はノード7に(10,11)を送ります。 2番目の反復では、ノード7はノード2に(10,11)を送信します。したがって、2回のPregelの繰り返しがあります。 –
確かにDavid ...私はそれで動作します...そしてあなたを更新します...ありがとう – Devndra
だから、素晴らしい - 今受け入れ、私の答えをupvote! –