2016-07-22 17 views
0

問題が発生した時点で、--master local [*]と--master yarnを使用してsparkアプリケーションを送信すると、動作が異なります。地元の提出アプリケーションは、糸のリードでそれを開始し、正常に動作 - 最初の5〜7段階の後 - 次のエラーのため:SparkAppローカルまたはYARNを実行中にエラーが発生しました

WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID 210, quickstart.cloudera): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1177) 
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) 
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) 
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1174) 
    ... 11 more 

[Stage 7:>               (0 + 1)/2]16/07/22 07:34:53 ERROR TaskSetManager: Task 1 in stage 7.0 failed 4 times; aborting job 
16/07/22 07:34:53 ERROR InsertIntoHadoopFsRelation: Aborting job. 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 7.0 failed 4 times, most recent failure: Lost task 1.3 in stage 7.0 (TID 214, quickstart.cloudera): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1177) 
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) 
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) 
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1174) 
    ... 11 more 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1282) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1281) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1281) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1507) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1469) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:325) 
    at dataCreator.DataSetGenerator$$anonfun$createVP$1.apply(DataSetGenerator.scala:160) 
    at dataCreator.DataSetGenerator$$anonfun$createVP$1.apply(DataSetGenerator.scala:144) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at dataCreator.DataSetGenerator$.createVP(DataSetGenerator.scala:144) 
    at dataCreator.DataSetGenerator$.generateDataSet(DataSetGenerator.scala:78) 
    at runDriver$.main(runDriver.scala:14) 
    at runDriver.main(runDriver.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1177) 
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) 
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) 
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1174) 
    ... 11 more 
16/07/22 07:34:53 ERROR DefaultWriterContainer: Job job_201607220734_0000 aborted. 
Exception in thread "main" org.apache.spark.SparkException: Job aborted. 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:325) 
    at dataCreator.DataSetGenerator$$anonfun$createVP$1.apply(DataSetGenerator.scala:160) 
    at dataCreator.DataSetGenerator$$anonfun$createVP$1.apply(DataSetGenerator.scala:144) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
    at dataCreator.DataSetGenerator$.createVP(DataSetGenerator.scala:144) 
    at dataCreator.DataSetGenerator$.generateDataSet(DataSetGenerator.scala:78) 
    at runDriver$.main(runDriver.scala:14) 
    at runDriver.main(runDriver.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 7.0 failed 4 times, most recent failure: Lost task 1.3 in stage 7.0 (TID 214, quickstart.cloudera): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1177) 
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) 
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) 
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1174) 
    ... 11 more 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1294) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1282) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1281) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1281) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1507) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1469) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150) 
    ... 34 more 
Caused by: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1177) 
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) 
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) 
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: Failed to get broadcast_8_piece0 of broadcast_8 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) 
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1174) 
    ... 11 more 

私たちは、これはコードの興味深い部分でなければなりませんね。メソッドhandleTypesが呼び出されていない場合。アプリはエラーもなく実行されます。

/** 
    * Generates VP table for each unique predicate in input RDF dataset. 
    * All tables have to be cached, since they are used for generation of ExtVP 
    * tables. 
    */ 
    private def createVP() = {  
    // create directory for all vp tables 
    Helper.removeDirInHDFS(Settings.vpDir) 
    Helper.createDirInHDFS(Settings.vpDir) 
    StatisticWriter.initNewStatisticFile("VP") 

    // create and cache vpTables for all predicates in input RDF dataset 
    for (predicate <- _uPredicates){  
     var vpTable = _sqlContext.sql("select sub, obj " 
            + "from triples where pred='"+predicate+"'")   

     val cleanPredicate = Helper.getPartName(predicate) 

     // --> without this call no error occurs <-- 
     vpTable = handleTypes(vpTable, predicate) 


     vpTable.registerTempTable(cleanPredicate) 
     _sqlContext.cacheTable(cleanPredicate) 
     _vpTableSizes(predicate) = vpTable.count() 

     //vpTable.saveAsParquetFile(Settings.vpDir + cleanPredicate + ".parquet") 
     vpTable.write.parquet(Settings.vpDir + cleanPredicate + ".parquet") 

     // print statistic line 
     StatisticWriter.incSavedTables() 
     StatisticWriter.addTableStatistic("<" + predicate + ">", 
             -1, 
             _vpTableSizes(predicate)) 

     writeLoadScript(Settings.vpDir + cleanPredicate + ".parquet", cleanPredicate, "", vpTable) 
    } 

    StatisticWriter.closeStatisticFile() 
    } 


    private def handleTypes(vTable: DataFrame, predIn: String) = { 

     var pred = predIn 
     //println("'" + pred + "'") 

     if(pred.startsWith("<") && pred.endsWith(">")) { 
      pred = pred.substring(1, pred.length() - 1) 
     } 

     var newSchema = StructType(StructField("sub", StringType, false) :: StructField("obj", StringType, false) :: Nil) 
     var predType = "" 

     // type check of object 
     if(integerPred.contains(pred)) { 
      newSchema = StructType(StructField("sub", StringType, false) :: StructField("obj", IntegerType, true) :: Nil) 
      predType = "int" 
     }else if (doublePred.contains(pred)) { 
      newSchema = StructType(StructField("sub", StringType, false) :: StructField("obj", DoubleType, false) :: Nil) 
      predType = "double" 
     }else if (datePred.contains(pred)) { 
      newSchema = StructType(StructField("sub", StringType, false) :: StructField("obj", DateType, false) :: Nil) 
      predType = "date" 
     }else if (dateTimePred.contains(pred)) { 
      newSchema = StructType(StructField("sub", StringType, false) :: StructField("obj", TimestampType, false) :: Nil) 
      predType = "timestamp" 
     } 

     var newRdd = vTable.rdd 
     newRdd = newRdd.map(r => extractObj(r, predType)) 
     var newDF = _sqlContext.createDataFrame(newRdd, newSchema) 
     newDF 
    } 

    private def extractObj(r: Row, predType: String) = { 

     var pattern = new Regex("(?<=\").*?(?=\")") 
     var obj = r.getString(1) 
     var result = obj 

     if(obj.contains("^^")) { 
      result = pattern.findFirstIn(obj).get 
      if(predType.equals("timestamp")){ 
       result = result.replace("T", " ") 
      } 
     } 

     var result2 = Row(r.getString(0), result) 

     if(predType.equals("int")){ 
      val ret = result.toInt 
      result2 = Row(r.getString(0), ret) 
     }else if(predType.equals("double")){ 
      val ret = result.toDouble 
      result2 = Row(r.getString(0), ret) 
     }else if(predType.equals("date")){ 
      val ret = getDate(result) 
      result2 = Row(r.getString(0), ret) 
     }else if(predType.equals("timestamp")){ 
      val ret = getTimestamp(result) 
      result2 = Row(r.getString(0), ret) 
     } 

    result2 
    } 

    def getDate(x:Any) :java.sql.Date = { 
    val format = new SimpleDateFormat("yyyy-MM-dd") 
    if (x.toString() == "") 
    return null 
    else { 
     val d = format.parse(x.toString()); 
     val t = new Date(d.getDate()); 
     return t 
    } 
    } 

    def getTimestamp(x:Any) :java.sql.Timestamp = { 
    val format = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss") 
    if (x.toString() == "") 
    return null 
    else { 
     val d = format.parse(x.toString()); 
     val t = new Timestamp(d.getTime()); 
     return t 
    } 
    } 

    def writeLoadScript(path: String, tableName: String, relType: String, table: DataFrame) = { 
    var relationType = relType.toUpperCase 
    var columnList = table.schema.toSeq 
    var subType = columnList(0).dataType.simpleString 
    var objType = columnList(1).dataType.simpleString 
    val fw = new java.io.FileWriter(_loadScriptName, true) 
    try { 
     if(tableName == "triples") { 
      fw.write("DROP TABLE IF EXISTS triples;\n") 
      fw.write("CREATE EXTERNAL TABLE triples (sub STRING, pred STRING, obj STRING)\n") 
      fw.write("STORED AS PARQUET LOCATION \'${hiveconf:prepath}" + path + "\';\n\n") 
     }else { 
      fw.write("DROP TABLE IF EXISTS " + relationType + _delimiter + tableName + ";\n") 
      fw.write("CREATE EXTERNAL TABLE " + relationType + _delimiter + tableName + " (sub " + subType + ", obj " + objType +")\n") 
      fw.write("STORED AS PARQUET LOCATION \'${hiveconf:prepath}" + path + "\';\n\n") 
     } 
    } 
    finally fw.close() 




    } 

    def initLoadScript() = { 
    val fw = new java.io.FileWriter(_loadScriptName, false) 
    try { 
     fw.write("-- Hive 1.2.0 or later is needed! \n") 
    } 
    finally fw.close() 

    } 

は、我々はバージョン1.5.0-cdh5.5.2および2.6.0-cdh5.5.2とスパーク1.6.1および2.7.1のHadoop上だけでなく、ClouderaのクイックスタートVM上でこれを実行します。見るべきことがあれば教えてください!エラーがこのオープンスパークの問題に関連しているように思え

おかげ

答えて

0

https://issues.apache.org/jira/browse/SPARK-5594

問題のこのpostで推奨されているように、我々はextractObj機能をシリアライズ、コードは今のようになります。その:handleTypesで

object FunctionSerializable extends Serializable{ 

    def extractObj(r: Row, predType: String): Row = { 

     var pattern = new Regex("(?<=\").*?(?=\")") 
     var obj = r.getString(1) 
     var result = obj 

     if(obj.contains("^^")) { 
      result = pattern.findFirstIn(obj).get 
      if(predType.equals("timestamp")){ 
       result = result.replace("T", " ") 
      } 
     } 

     var result2 = Row(r.getString(0), result) 

     if(predType.equals("int")){ 
      val ret = result.toInt 
      result2 = Row(r.getString(0), ret) 
     }else if(predType.equals("double")){ 
      val ret = result.toDouble 
      result2 = Row(r.getString(0), ret) 
     }else if(predType.equals("date")){ 
      val ret = getDate(result) 
      result2 = Row(r.getString(0), ret) 
     }else if(predType.equals("timestamp")){ 
      val ret = getTimestamp(result) 
      result2 = Row(r.getString(0), ret) 
     } 

    result2 
    } 

    def getDate(x:Any) :java.sql.Date = { 
    val format = new SimpleDateFormat("yyyy-MM-dd") 
    if (x.toString() == "") 
    return null 
    else { 
     val d = format.parse(x.toString()); 
     val t = new Date(d.getDate()); 
     return t 
    } 
    } 

    def getTimestamp(x:Any) :java.sql.Timestamp = { 
    val format = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss") 
    if (x.toString() == "") 
    return null 
    else { 
     val d = format.parse(x.toString()); 
     val t = new Timestamp(d.getTime()); 
     return t 
    } 
    } 
} 

コールはリクを探しますEその:

newRdd = newRdd.map(r => FunctionSerializable.extractObj(r, predType)) 

、すべてが正常に動作している... :-)

関連する問題