2016-11-17 16 views
4

IDの行でいっぱいのファイルが与えられているとします。スパークにおけるIDの行の結合/結合

i1, i2, i5 
i3, i4 
i2, i6, i7 
i4, i8 
i9, i3 

どのように同じIDをリンクしてそれらに参加しますか?したがって、上記の例では、行1はi2を介して行3にリンクされ、行2はi4およびi3を介して行4および5にそれぞれリンクされます。これは私が行をループしていますが、機能的な方法でそれについて移動する方法を疑問に思ったことでそれを行うことができ、あなたに

i1, i2, i5, i6, i7 
i3, i4, i8, i9 

を、次の(重複削除)を与えるのだろうか?

+2

。com/questions/40240409/apache-spark-rdd-substitution/40256149#40256149 – maasg

答えて

1

を使用すると、Apacheはスパーク使用しているとして、あなたはあなたのために仕事をするためにGraphXコンポーネントで構築を使用することができます。

import org.apache.spark.graphx._ 

def cross[Y](xs: Traversable[Y], ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y) 

val data = sc.parallelize(List(
    "1\t5\t3", 
    "3\t9\t30", 
    "7\t10\t12", 
    "10\t7\t13" 
)) 

val prep = data.map(x => x.split("\t").map(_.toLong).toList) 

val vertex = prep 
    .flatMap(x => x) 
    .map(x => x -> s"ID=$x") 

val edges = prep 
    .map(x => cross(x, x)) 
    .flatMap(x => x) 
    .map(x => new Edge(x._1, x._2, "likes")) 

val graph = Graph(vertex, edges) 
val linked = graph 
    .connectedComponents 
    .vertices 
    .map(_.swap) 
    .groupByKey 

linked.take(10).foreach(println) 

は、次の結果を出力します:

(1,CompactBuffer(30, 3, 9, 1, 5)) 
(7,CompactBuffer(7, 10, 12, 13)) 

クロスは、単にので、我々はすべての頂点間のエッジを作成することができる二つのリストのクロス積を作成します。

connectedComponents関数は、グラフをたどり、エッジを共有するすべての頂点を見つけ、各頂点が頂点Id-> "Primary" VertexIDのタプルである新しいグラフを作成します。

ので:あなたは1と7は、「次頂点」として選ばれ、最初のグラフに接続されたすべての頂点にリンクされている見ることができるように

graph.connectedComponents.vertices.take(10).foreach(println) 

(30,1) 
(1,1) 
(3,1) 
(5,1) 
(7,7) 
(9,1) 
(10,7) 
(12,7) 
(13,7) 

をプリントアウトだろう。したがって、単純なスワップとグループはすべての接続IDを結合します。

0

のではなく、我々はKは、IDのあなたの番号であるO(n * k)あるソリューションを使用することができますすべての行をループO(n * n)溶液を使用。このように:

val input = ...//I will assume your input is an RDD[List] 

val idArray = Array(id1, id2, id3, id4, id5, id6, id6)//Array containing all IDs 
val result = sc.parallelize(idArray, k).map(x => (x, x)) 
input = input.map(x => (x(0), if(x.length > 0) x.slice(1, x.length) else null)) 

//If you can afford to persist it would help greatly: 
result.persist 
input.persist 

//We can make this loop smaller if k is large and your lists are small 
//by setting the upper bound of the range to the length of the longest list. 
//I'll leave this decision up to you. 
for (i <- 0 to k){ 
    result = result.cogroup(input) 
    input = input.map((t: (x, y)) => (y(0), if(y.length > 0) y.slice(1, y.length) else null)) 
} 
result.map((t: (x, y)) => y.distinct)//we want distinct lists in output 

result.unpersist 
input.unpersist 
0

これはおそらく最適ではありませんが、私はそれに関係なく投稿する価値があると考えました。これは、あなたの入力ファイルがメモリに永続化されるのに十分小さいことを前提としています(これはバニラのScalaなので)。

私は、指定されたIDをグラフ内の隣接関係として扱い、次にBFSを使用してすべての接続コンポーネントをリストすることでこれを解決することに決めました。次のように出力できます

/* Input, can be read from file easily by splitting on ", " */ 
val lines = List(List("i1", "i2", "i5"), 
    List("i3", "i4"), 
    List("i2", "i6", "i7"), 
    List("i4", "i8"), 
    List("i9", "i3")) 

/* finds all sequential pairs */ 
val pairs = lines.flatMap(x => x.dropRight(1).zip(x.drop(1))) 

/* create an empty adjacency map: id -> (Set of adjacent vertices) */ 
val defMap = Map[String, Set[String]]().withDefaultValue(Set[String]()) 

/* populate the default map with the actual (symmetric) adjacencies */ 
val adjMap = pairs.foldLeft{defMap}(
    (acc, x) => acc + (x._1 -> (acc(x._1) + x._2)) + (x._2 -> (acc(x._2) + x._1))) 

/* BFS algo on map representation of graph */ 
def mapBFS(adjMap: Map[String, Set[String]]): List[List[String]] = 
{ 
    val v = adjMap.keys 
    var globalVisits = List[String]() 
    def BFS_r(elems: List[String], visited: List[List[String]]): List[String] = 
    { 
     val newNeighbors = elems.flatMap(adjMap(_)).filterNot(visited.flatten.contains).distinct 
     if (newNeighbors.isEmpty) 
      visited.flatten 
     else 
      BFS_r(newNeighbors, newNeighbors :: visited) 
    } 
    v.flatMap(x =>{ 
     if (globalVisits.contains(x)) 
      None 
     else 
     { 
      val vi: List[String] = BFS_r(List(x), List(List(x))) 
      globalVisits = globalVisits ++ vi 
      Some(vi) 
     } 
    }).toList 
} 
mapBFS(adjMap).foreach{println} 

List(i7, i1, i6, i2, i5) 
List(i8, i4, i3, i9) 
+0

私はこのソリューションを[Code Review](http://codereview.stackexchange.com/questions/147446/print-connected-components-scala)に掲載しました。あなたが改善する方法についていくつかの考えを持っているならば)。 –

1

出力スパーク2.0+

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate; 
val df = spark.sparkContext.parallelize(
    List(
    "i1, i2, i5", 
    "i3, i4", 
    "i2, i6, i7", 
    "i4, i8") 
) 

//Group lines with tokens count (determing by the last occurence of comma) 
val rddGroupByTokensCount = df.map(row => (row.lastIndexOf(','), row.split(", "))) 
    .groupBy(_._1) 

//Now gather all the token to single place with flatMap and drop duplicates 
val rddUniqueTokens = rddGroupByTokensCount.map(_._2.flatMap(_._2).toSet) 

//print grouped unique tokens by the count in each line 
rddUniqueTokens.collect().map(println) 

で動作しますコード:のhttp:// stackoverflowのと同様に

Set(i5, i1, i7, i2, i6) 
Set(i3, i4, i8) 
関連する問題