2016-10-31 9 views
1

Spark、Scalaの新機能です。Spark GraphX:要件が失敗しました:無効な初期容量

私はトライアングルは、このデータセットでカウントを実行しようとしています:DataSet
趣味のプロジェクトのために

これは私がこれまでに書いたコードです:

import org.apache.spark.SparkConf 
    import org.apache.spark.SparkContext 
    import org.apache.spark.graphx.Edge 
    import org.apache.spark.graphx.Graph 
    import org.apache.spark.graphx.Graph.graphToGraphOps 
    import org.apache.spark.graphx.PartitionStrategy 
    import org.apache.spark.rdd.RDD 
    import org.apache.spark.rdd.RDD.rddToPairRDDFunctions 

    object GraphXApps { 
     def main(args: Array[String]): Unit = { 
       val conf = new SparkConf() 
       .setAppName("GraphXApps") 
       .setSparkHome(System.getenv("SPARK_HOME")) 
       .setJars(SparkContext.jarOfClass(this.getClass).toList) 

       val sc = new SparkContext(conf) 

       // Load the edges in canonical order and partition the graph for triangle count 
       val edges: RDD[Edge[String]] = 
       sc.textFile(args(0)).map { line => 
       val fields = line.split("\t") 
       Edge(fields(0).toLong, fields(1).toLong) 
       } 

       val graph : Graph[String, String] = Graph.fromEdges(edges.sortBy(_.srcId, ascending = true, 1), "defaultProperty").partitionBy(PartitionStrategy.RandomVertexCut) 
         // Find the triangle count for each vertex 
         val triCounts = graph.triangleCount().vertices 
         val triCountById = graph.vertices.join(triCounts).map(_._2._2) 
         // Print the result 
         println(triCountById.collect().mkString("\n")) 

         sc.stop() 

     } 
    } 

しかし、私はこのエラーを取得しています:java.lang.IllegalArgumentException:要件が失敗しました:初期容量が無効です

私がどこに行くのか教えてください違う。それは本当に役立つだろう。

フルスタック・トレース

16/10/31 01:03:08 ERROR TaskSetManager: Task 0 in stage 8.0 failed 1 times; aborting job 
16/10/31 01:03:08 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 
16/10/31 01:03:08 INFO TaskSchedulerImpl: Cancelling stage 8 
16/10/31 01:03:08 INFO DAGScheduler: ShuffleMapStage 8 (mapPartitions at VertexRDDImpl.scala:245) failed in 0.131 s 
16/10/31 01:03:08 INFO DAGScheduler: Job 0 failed: collect at GraphXApps.scala:47, took 3.128921 s 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8, localhost): java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:51) 
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:57) 
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:70) 
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:69) 
    at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61) 
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102) 
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156) 
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:154) 
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:892) 
    at GraphXApps$.main(GraphXApps.scala:47) 
    at GraphXApps.main(GraphXApps.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:51) 
    at org.apache.spark.util.collection.OpenHashSet$mcJ$sp.<init>(OpenHashSet.scala:57) 
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:70) 
    at org.apache.spark.graphx.lib.TriangleCount$$anonfun$5.apply(TriangleCount.scala:69) 
    at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61) 
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102) 
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$mapValues$2.apply(VertexRDDImpl.scala:102) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156) 
    at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:154) 
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) 
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919) 
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910) 
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) 
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910) 
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668) 
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

私はまだエラーの原因を特定していませんが、GraphFramesを使用してシナリオをうまく試してみることができました。その解決策は興味深いでしょうか? –

答えて

0

これは(今のところ、2.0、2.0.1、および2.0.2に対してこれをテストしている)スパーク2.0内のバグのように見えます。これに対処するためにJira [SPARK-18200]: GraphX Invalid initial capacity when running triangleCountが作成されました。

このリンクnotebookに記載されているように、あなたのコードはSpark 1.6でも問題なく動作します。

しかし注意したように、このリンクnotebookに記載されているように、Spark 2.0では失敗しました。

このリンクnotebookに記載されているように、間もなくSpark 1.6を試してみるか、GraphFramesを試してみてください。

HTH!

+0

大きなバグとして報告していただき、ありがとうございます。私は、私のようなデータ・サイエンスを手がけている人が、非常に有用であると感じていて、助けてくれてありがとう! – argumentopruvo

+0

喜んで助けてください! GraphFramesは定期的に更新され、パフォーマンスが向上します(DataFramesを利用できるため)。 GraphFrameを使用すると、パフォーマンスと使いやすさの両方が向上することがあります。 –

関連する問題