2017-01-30 6 views
0

私はノードの近傍をとる関数を持っています。近隣のために私はブロードキャスト変数とノード自体のIDを使用し、そのノードの近さの中心を計算します。グラフの各ノードをその関数の結果と対応させます。私がタスクマネージャを開くと、CPUは並列で動作していないかのように全く利用されません。メモリも同じですが、すべてのノードが並列に関数を実行しますまた、データが大きく、完了するまでに時間がかかります。そのようなリソースが必要でないようではありません。すべてのサポートは本当に感謝しています、ありがとうございます。 は私がややあなたの元の質問への答えの提供するためにval graph = GraphLoader.edgeListFile(sc, path).cacheSpark、GraphxプログラムはCPUとメモリを使用していません

object ClosenessCentrality { 

    case class Vertex(id: VertexId) 

    def run(graph: Graph[Int, Float],sc: SparkContext): Unit = { 
    //Have to reverse edges and make graph undirected because is bipartite 
    val neighbors = CollectNeighbors.collectWeightedNeighbors(graph).collectAsMap() 
    val bNeighbors = sc.broadcast(neighbors) 

    val result = graph.vertices.map(f => shortestPaths(f._1,bNeighbors.value)) 
    //result.coalesce(1) 
    result.count() 

    } 

    def shortestPaths(source: VertexId, neighbors: Map[VertexId, Map[VertexId, Float]]): Double ={ 
    val predecessors = new mutable.HashMap[VertexId, ListBuffer[VertexId]]() 
    val distances = new mutable.HashMap[VertexId, Double]() 
    val q = new FibonacciHeap[Vertex] 
    val nodes = new mutable.HashMap[VertexId, FibonacciHeap.Node[Vertex]]() 

    distances.put(source, 0) 

    for (w <- neighbors) { 
     if (w._1 != source) 
     distances.put(w._1, Int.MaxValue) 

     predecessors.put(w._1, ListBuffer[VertexId]()) 
     val node = q.insert(Vertex(w._1), distances(w._1)) 
     nodes.put(w._1, node) 
    } 

    while (!q.isEmpty) { 
     val u = q.minNode 
     val node = u.data.id 
     q.removeMin() 
     //discover paths 
     //println("Current node is:"+node+" "+neighbors(node).size) 
     for (w <- neighbors(node).keys) { 
     //print("Neighbor is"+w) 
     val alt = distances(node) + neighbors(node)(w) 
//  if (distances(w) > alt) { 
//   distances(w) = alt 
//   q.decreaseKey(nodes(w), alt) 
//  } 
//  if (distances(w) == alt) 
//   predecessors(w).+=(node) 
     if(alt< distances(w)){ 
      distances(w) = alt 
      predecessors(w).+=(node) 
      q.decreaseKey(nodes(w), alt) 
     } 

     }//For 
    } 
    val sum = distances.values.sum 
    sum 
    } 
+0

をクラスタ上でプログラムを起動したりとlocalYましたか? ローカルマスタの場合は、使用するコアの数を指定しました。これは、 '--master = local [8]'のようになります。また、データセットにはいくつのパーティションがありますか?単一のパーティションしか持たない場合、単一のコアが使用されます。 –

+0

はい、私は他のプログラムでは、より多くのリソースを使用していました。私はエッジリストファイルからグラフをロードするパーティションが私は、デフォルトを残しましたが、私はそれを考え、私は8コア私はより多くを使うべきですか、私はこれを間違ってやっていますか? – user3224454

+0

いくつかのコードを提供できますか? –

答えて

1

を使用するグラフをロードするために、私はこのように処理するために、単一のコアを使用して、あなたのRDDは、単一のパーティションのみを持っていると思われます。

edgeListFileメソッドには、必要なパーティションの最小数を指定する引数があります。 また、repartitionを使用してさらに多くのパーティションを取得できます。あなたは​​3210をmentionnedだけ、デフォルトでは、パーティションの数を減らすこと

、この質問を参照してください。Spark Coalesce More Partitions

+0

これで動作しますが、パーティションの数はどのくらいですか?CPUのコア数に基づいて選択する方法を教えてください。 – user3224454

+0

少なくとも、コアと同じ数のパーティションが必要です。しかし、私は、各パーティションが十分小さいことを保証するために、それ以上のものを持つことを強くお勧めします(特に、すべての頂点にグラフ全体を追跡させたい場合)。私のアドバイスは:テット、テスト、さらにテスト、あなたがスイートスポットを見つけることができるかどうかを確認することです。 –

+0

それと同じように、各パーティションのサイズを小さくする(パーティションの数を増やす)ことは、100%のケースで正しい方法ではないようです。パーティションあたりのオーバーヘッドは単に無視するには大きすぎますので、それらの間にバランスが取れていることを確認してください。とにかく中小規模の仕事には、テスト(とテストの詳細)アドバイスが十分です。 – dennlinger

関連する問題