1
からスパークgraphxをロードするためにどのように私はこのように私のコードを実行します。このような火花シェルでobjectFile
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, Graph, GraphLoader, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
val users: RDD[(VertexId, Map[String, String])] =
sc.parallelize(Array(
(1L, Map("a" -> "a")),
(2L, Map("b" -> "b")),
(3L, Map("c" -> "c")),
(4L, Map("d" -> "d"))
))
val edgs: RDD[Edge[String]] =
sc.parallelize(Array(
Edge(1L, 2L, "1_2"),
Edge(2L, 3L, "2_3"),
Edge(3L, 1L, "3_1"),
Edge(4L, 1L, "4_1"),
Edge(4L, 3L, "4_3")
))
val graph = Graph.apply(users, edgs)
graph.edges.saveAsObjectFile("/Users/test/edges")
graph.vertices.saveAsObjectFile("/Users/test/vertices")
val vertices = sc.objectFile[(VertexId, Map[String, String])]("/Users/test/edges")
val edges = sc.objectFile[Edge[String]]("/Users/test/vertices")
val un = Graph.apply(vertices, edges)
un.edges.foreach(println)
エラー、私の火花のバージョンは2.1.1です。
Caused by: java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.graphx.Edge
at org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107)
at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
このように、エラーはグラフに表示されないオブジェクトファイルのように見えます。 私の質問は、オブジェクトファイルとしてgraphxを保存してリロードする方法です。