2017-08-11 24 views
1

私のコードで私が定義した簡単な文字列パーサ関数をテストしていましたが、ワーカーノードの1つが実行時に常に失敗します。SparkクラスタのRDDマップ関数内で関数を呼び出す

/* JUST A SIMPLE PARSER TO CLEAN PARENTHESIS */ 
def parseString(field: String): String = { 
    val Pattern = "(.*.)".r 
    field match{ 
     case "null" => "null" 
     case Pattern(field) => field.replace('(',' ').replace(')',' ').replace(" ", "") 
    } 
} 

/* CREATE TWO DISTRIBUTED RDDs TO JOIN THEM */ 
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)), 6) 
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)), 6) 
val manipulated_emp = emp.keyBy(t => t._3) 
val manipulated_dept = dept.keyBy(t => t._2) 
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept) 

/* OUTPUT */ 
left_outer_join_data.collect.foreach(println) 
/* 
(30,((3,matt,30),Some((hive,30)))) 
(30,((5,rhonda,30),Some((hive,30)))) 
(20,((2,ricky,20),Some((spark,20)))) 
(10,((1,jordan,10),Some((hadoop,10)))) 
(35,((4,mince,35),None)) 
*/ 

val res = left_outer_join_data 
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString)) 
.collect 

res 
.map(f => (f._1, f._2, parseString(f._3))) 
.foreach(println) 

/* DESIRED OUTPUT */ 
/* 
(3,matt,hive,30) 
(5,rhonda,hive,30) 
(2,ricky,spark,20) 
(1,jordan,hadoop,10) 
(4,mince,null) 
*/ 

私が最初ドライバーにRESの結果を収集する場合は、このコードは動作します。ここで私がテストしてきたダミーコードがありますこれはテストなので問題はありませんが、実際のアプリケーションでは何百万行もの行が処理され、ドライバの結果を収集することはお勧めしません。私は最初にそれを収集せずに同じを行うのであれば、次のように:

val res = left_outer_join_data 
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString)) 

res 
.map(f => (f._1, f._2, parseString(f._3))) 
.foreach(println) 

私は、次を得る:

ERROR TaskSetManager: Task 5 in stage 17.0 failed 4 times; aborting job 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 17.0 failed 4 times, most recent failure: Lost task 5.3 in stage 17.0 (TID 166, 192.168.28.101, executor 1): java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$ 
     at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249) 
     at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
     at scala.Option.foreach(Option.scala:257) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
     at org.apache.spark.rdd.RDD.foreach(RDD.scala:916) 
     at tele.com.SimcardMsisdn$.main(SimcardMsisdn.scala:249) 
     at tele.com.SimcardMsisdn.main(SimcardMsisdn.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$ 
     at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249) 
     at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
     at org.apache.spark.scheduler.Task.run(Task.scala:99) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

スパークは、ノード上の私のパーサを実行するのに失敗したのはなぜ?解決策または回避策をお勧めしますか?

UPDATE

は、私は、それにもかかわらず、私はまだこの問題について困惑している(下記掲載)この問題の解決策を見つけた、多分私が間違ってやっているものです。なし

val Pattern = sc.broadcast("(.*.)".r) 

ない機能で、マップ内でパターンマッチングをやって、そして:

答えて

0

まあ、私は労働者にパターン変数をブロードキャストすることによって、それを自分で解決することができましたドライバーに収集:

val res = left_outer_join_data.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString)) 
res.map(f => (f._1, f._2, f._3 match { 
     case "null" => "null" 
     case Pattern.value(f._3) => f._3.replace('(',' ').replace(')',' ').replace(" ", "")}) 
    ) 
.foreach(println) 

それから私は、労働者の標準出力から所望の出力を得ました

(3,matt,hive,30) 
(5,rhonda,hive,30) 
(2,ricky,spark,20) 
(1,jordan,hadoop,10) 
(4,mince,null) 
+1

あなたの 'left_outer_join_data 'に' case(a、((b、c、d))、Some((e、f、g))=>(b、c、...) 'を使うこともできます。マップを読みやすくすることができます。 :) – philantrovert

+0

確かに。先端ありがとう;) – Emiliano

関連する問題