2017-02-27 4 views
1

データセットではなく、方向性グラフで接続コンポーネントアルゴリズムを実行しようとしています。私は接続されたコンポーネントがエッジの両方向に横断することを望んでいません。Spark GraphxまたはGraphframeを使用して方向グラフを作成する方法

+---+--------------------+----+---------+ 
| id|  similar_hash|size|component| 
+---+--------------------+----+---------+ 
| 6|ceab58c59d23549d3f4b| 50|  1| 
| 5|e91898d39bf5f8501827| 55|  1| 
| 1|b4fcde907cbd290b7e16| 28|  1| 
| 3|1a6a6e3fd2daaeeb2a05| 65|  1| 
| 12|ceab58c59asd3549d...| 50|  12| 
| 2|cda389612d6b37674cb1| 27|  1| 
| 4|9a007eee210a47e58047| 42|  1| 
| 14|ceab508c59d23549d...| 55|  12| 
| 15|ceab508c59d23541d...| 51|  12| <- should be in separate cluster 
+---+--------------------+----+---------+ 

どのように私はこれを達成することができますしてください。

これは私のサンプルコード

import org.apache.log4j.{Level, LogManager} 
import org.apache.spark.SparkConf 
import org.apache.spark.graphx.Edge 
import org.apache.spark.sql._ 
import org.graphframes._ 

object CCTest { 

def main(args: Array[String]) { 

    val sparkConf = new SparkConf() 
     .setMaster("local[2]") 
    .setAppName("cc_test") 


    implicit val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 

    val sc = sparkSession.sparkContext 

    val vertex = sparkSession.createDataFrame(sc.parallelize(Array(
    (1L, "b4fcde907cbd290b7e16", 28), 
    (2L, "cda389612d6b37674cb1", 27), 
    (3L, "1a6a6e3fd2daaeeb2a05", 65), 
    (4L, "9a007eee210a47e58047", 42), 
    (5L, "e91898d39bf5f8501827", 55), 
    (6L, "ceab58c59d23549d3f4b", 50), 
    (12L, "ceab58c59asd3549d3f4b", 50), 
    (14L, "ceab508c59d23549d3f4b", 55), 
    (15L, "ceab508c59d23541d3f4b", 51) 
))).toDF("id", "similar_hash", "size") 

    val edges = sparkSession.createDataFrame(sc.parallelize(Array(
      Edge(2L, 1L, 0.7f), 
      Edge(2L, 4L, 0.2f), 
      Edge(3L, 2L, 0.4f), 
      Edge(3L, 6L, 0.3f), 
      Edge(4L, 1L, 0.1f), 
      Edge(5L, 2L, 0.2f), 
      Edge(5L, 3L, 0.8f), 
      Edge(5L, 6L, 0.3f), 
      Edge(12L, 14L, 1.3f), 
      Edge(15L, 14L, 1.3f) //< - should not be connected except (14L, 15L) 
     ))).toDF("src", "dst", "attr") 


    val graph = GraphFrame(vertex, edges) 

    val cc = graph.connectedComponents.run() 

    cc.show() 



    sparkSession.stop() 

} 

} 

結果ですか?

+1

['stronglyConnectedComponents'](https://spark.apache.org/docs/latest/api/scala/index.html#[email protected](numIter:Int):org。 apache.spark.graphx.Graph [org.apache.spark.graphx.VertexId、ED])。 – zero323

+0

@ zero323 SCCは、私が間違っていれば私を修正してください。それは、すべてのノードが他のノードから到達可能である必要があるからです。しかし、基本的に、私は再帰的なSQLクエリのような再帰を行う必要があります。例えば : - > B、 B - > C、 M - > B 強くCC: セット= {A、B}、 セット= {B、C}、 セット= {m、b} 接続されたコンポーネント: ノード 'a - > b - > m 'に接続されているのはset =' {a、b、d、m} ' - しかし、再帰では、 a - > b - > c = {a、b、c} と m - > b = {m、b} これは{a、b、c}と{m、b}の2つの集合を返します。 –

+0

OK。なぜ '{a、b、c}'、 '{m、b、c}'ではなく{{m、b}}であるのか?あなたはそれのためにPregel APIが必要になるでしょう。 – zero323

答えて

0

私は誤解されるかもしれませんが、私の解決策はsource codeからconnectedComponentsに直接つながっています。 54行では、システム自体が唯一のプレゲルイテレータを呼び出して、キーラインは、単に、より適切なパラメータにEdgeDirectionを変更することにより

val pregelGraph = Pregel(ccGraph, initialMessage, maxIterations,EdgeDirection.Either)

ことで(hereを見つけることができます)、これはあなたのために働くことができます。

関連する問題