2017-08-28 4 views
0

スパークデータフレーム上の簡単なカウント動作をしながら:エラー私はこのようなデータをテキストファイルをロードした

1 The Nightmare Before Christmas 1993 3.9 4568 
2 The Mummy 1932 3.5 4388 
3 Orphans of the Storm 1921 3.2 9062 
4 The Object of Beauty 1991 2.8 6150 
5 Night Tide 1963 2.8 5126 
6 One Magic Christmas 1985 3.8 5333 
7 Muriel's Wedding 1994 3.5 6323 
8 Mother's Boys 1994 3.4 5733 
9 Nosferatu: Original Version 1929 3.5 5651 
10 Nick of Time 1995 3.4 5333 
11 Broken Blossoms 1919 3.3 5367 
12 Big Night 1996 3.6 6561 
13 The Birth of a Nation 1915 2.9 12118 
14 The Boys from Brazil 1978 3.6 7417 

STEP1:

//Loaded into rdd 
val rddLoad = sc.textFile("/user/rahulm.3564_gmail/IMDB_DATA.txt"); 
//Split based on commas since it is a comma separated file 
val rddLoadSplit = rddLoad.map(_.split(',')) 

STEP2:その後 は、次のコマンドを使用します次のようなデータフレームを作成しました:

case class MovieData(serialNo:Int, movieName:String, releaseYear:Int, rating:Double, runningTime:Int); 

val dfSchema = rddLoadSplit.map {case Array(serialNo, movieName, releaseYear, rating, runningTime) => MovieData(serialNo.toInt, movieName, releaseYear.toInt, rating.toDouble, runningTime.toInt)}.toDF(); 

ステップ4:dfSc hema.showが適切な結果を与える:

scala> dfSchema .show 

only showing top 20 rows 

ステップ4:私はdfSchema.countを行うときに、次のように

は今、私はエラーを取得:

17/08/28 11:36:24 ERROR TaskSetManager: Task 0 in stage 17.0 failed 4 times; aborting job 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 47, i 
p-172-31-58-214.ec2.internal): scala.MatchError: [Ljava.lang.String;@62202bc4 (of class [Ljava.lang.String;) 
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) 
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:505) 
     at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686) 
     at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95) 
     at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
+0

DROPMALFORMEDに適切なdelimitermodeセットを読者データのカンマ区切りれますまたはタブ区切り?あなたのデータはタブで区切られているようですが、あなたのコードはコンマで区切られています、なぜですか? –

答えて

0

データがどの不正な行が含まれているため、この問題が発生しました期待されるフォーマットに従わず、パターンマッチングは失敗します。ここでは

{case Array(serialNo, movieName, releaseYear, rating, runningTime) => MovieData(serialNo.toInt, movieName, releaseYear.toInt, rating.toDouble, runningTime.toInt)} 
私は(あなたがまだ鋳造時例外を処理する必要があります) collectPartialFunctionを使用したい:

rddLoadSplit.collect { 
    case Array(serialNo, movieName, releaseYear, rating, runningTime) => MovieData(serialNo.toInt, movieName, releaseYear.toInt, rating.toDouble, runningTime.toInt) 
}.toDF(); 

またはcsv

spark.read.format("csv").option("mode", "DROPMALFORMED").load("/user/rahulm.3564_gmail/IMDB_DATA.txt") 
関連する問題