2017-06-23 17 views
1

からスパークgraphxをロードするためにどのように私はこのように私のコードを実行します。このような火花シェルでobjectFile

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.graphx.{Edge, Graph, GraphLoader, VertexId} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.streaming.{Seconds, StreamingContext} 

val users: RDD[(VertexId, Map[String, String])] = 
     sc.parallelize(Array(
     (1L, Map("a" -> "a")), 
     (2L, Map("b" -> "b")), 
     (3L, Map("c" -> "c")), 
     (4L, Map("d" -> "d")) 
    )) 

    val edgs: RDD[Edge[String]] = 
     sc.parallelize(Array(
     Edge(1L, 2L, "1_2"), 
     Edge(2L, 3L, "2_3"), 
     Edge(3L, 1L, "3_1"), 
     Edge(4L, 1L, "4_1"), 
     Edge(4L, 3L, "4_3") 
    )) 


    val graph = Graph.apply(users, edgs) 

    graph.edges.saveAsObjectFile("/Users/test/edges") 
    graph.vertices.saveAsObjectFile("/Users/test/vertices") 

    val vertices = sc.objectFile[(VertexId, Map[String, String])]("/Users/test/edges") 
    val edges = sc.objectFile[Edge[String]]("/Users/test/vertices") 

    val un = Graph.apply(vertices, edges) 

    un.edges.foreach(println) 

エラー、私の火花のバージョンは2.1.1です。

Caused by: java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.graphx.Edge 
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107) 
    at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) 
    at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

このように、エラーはグラフに表示されないオブジェクトファイルのように見えます。 私の質問は、オブジェクトファイルとしてgraphxを保存してリロードする方法です。

答えて

0

私は愚かなエラー作る:間違ったファイルから valの頂点= sc.objectFile(VertexId、地図[文字列、文字列]) ヴァル・エッジ= sc.objectFileEdge [文字列]

負荷端を。

このように使用すると問題ありません。

関連する問題