2016-10-25 2 views
0

私のジョブにはRDD [(key、value)]としてデータ・フレームを変換するステップがありますが、ステップは3回3回実行され、スパーク・ジョブは同じマップを3回実行することで失敗を続けます

アクティブなジョブ(1)

Job Id (Job Group)  Description Submitted Duration Stages: Succeeded/Total Tasks (for all stages): Succeeded/Total 

    3 (zeppelin-20161017-005442_839671900) Zeppelin map at <console>:69  2016/10/25 05:50:02 1.6 min  0/1  210/45623 

完了ジョブ(2)

2 (zeppelin-20161017-005442_839671900) Zeppelin map at <console>:69 2016/10/25 05:16:28  23 min 1/1  46742/46075 (21 failed) 
    1 (zeppelin-20161017-005442_839671900) Zeppelin map at <console>:69 2016/10/25 04:47:58  17 min 1/1  47369/46795 (20 failed) 
:と

スパークUIを示して失敗しました3210

これはコードです:

val eventsRDD = eventsDF.map { 

     r => 
     val customerId = r.getAs[String]("customerId") 
     val itemId = r.getAs[String]("itemId") 
     val countryId = r.getAs[Long]("countryId").toInt 
     val timeStamp = r.getAs[String]("eventTimestamp") 

     val totalRent = r.getAs[Int]("totalRent") 
     val totalPurchase = r.getAs[Int]("totalPurchase") 
     val totalProfit = r.getAs[Int]("totalProfit") 

     val store = r.getAs[String]("store") 
     val itemName = r.getAs[String]("itemName") 

     val itemName = if (itemName.size > 0 && itemName.nonEmpty && itemName != null) itemName else "NA" 


     (itemId, (customerId, countryId, timeStamp, totalRent, totalProfit, totalPurchase, store,itemName)) 



    } 

誰かがここで間違っているものを言うことはできますか?もし私がしなければならないキャッシュを永続化したいのですか?

エラー:

16/10/25 23:28:55 INFO YarnClientSchedulerBackend: Asked to remove non-existent executor 181 
16/10/25 23:28:55 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_1477415847345_0005_02_031011 on host: ip-172-31-14-104.ec2.internal. Exit status: 52. Diagnostics: Exception from container-launch. 
Container id: container_1477415847345_0005_02_031011 
Exit code: 52 
Stack trace: ExitCodeException exitCode=52: 
       at org.apache.hadoop.util.Shell.runCommand(Shell.java:545) 
       at org.apache.hadoop.util.Shell.run(Shell.java:456) 
       at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722) 
       at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211) 
       at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) 
       at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) 
       at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
       at java.lang.Thread.run(Thread.java:745) 
+3

あなたのコードはコンパイルされません - 'itemName> 0'は文字列とIntを比較しようとします。 –

+0

@TzachZohar申し訳ありませんが、ここに入力ミスがありました。私はサイズの比較を持っています – Newbie

答えて

0

あなたマップ操作は、いくつかのエラーが発生し、そのタスクの失敗につながるドライバにpropogates。仕事をあきらめる前に、特定のタスクの失敗の

番号:デフォルトspark.task.maxFailuresことで

はのためにある4としての価値を持っています。 異なるタスクにまたがって発生した失敗の総数は、 によってジョブが失敗することはありません。特定のタスクがこの数の 試行を失敗しなければなりません。あなたのタスクは、スパークは、それがすべてで4回失敗した刚性マップ操作を再計算しようと失敗したときに1

だから何が起こる - 以上許さ 再試行の1.数=この値に等しくなければなりません。

もし私がしなければならないキャッシュを永続化したいのですか? cacheは、特定の永続操作であり、RDDがデフォルトのストレージレベル(MEMORY_ONLY)で保持されます。

+0

しかし、私はなぜこのマップを計算することができないのか分かりません。 – Newbie

+0

ログを確認して、ワーカーログとドライバログの両方のエラーを確認できます。 –

関連する問題