2016-07-20 2 views
0

Spark-Shellでは正しく動作するが、EclipseではScalaプラグインでは動作しない小さなScalaコードがあります。私は別のファイルへの書き込みしようとしたプラグインを使用してHDFSにアクセスすることができ、それが働いた。..Eclipseで動作しないコードがあります。

FirstSpark.scala

package bigdata.spark 
import org.apache.spark.SparkConf 
import java. io. _ 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 

object FirstSpark { 

    def main(args: Array[String])={ 
    val conf = new SparkConf().setMaster("local").setAppName("FirstSparkProgram") 
    val sparkcontext = new SparkContext(conf) 
    val textFile =sparkcontext.textFile("hdfs://pranay:8020/spark/linkage") 
    val m = new Methods() 
    val q =textFile.filter(x => !m.isHeader(x)).map(x=> m.parse(x)) 
    q.saveAsTextFile("hdfs://pranay:8020/output") } 
} 

package bigdata.spark 
import java.util.function.ToDoubleFunction 

class Methods { 
def isHeader(s:String):Boolean={ 
    s.contains("id_1") 
} 
def parse(line:String) ={ 
    val pieces = line.split(',') 
    val id1=pieces(0).toInt 
    val id2=pieces(1).toInt 
    val matches=pieces(11).toBoolean 
    val mapArray=pieces.slice(2, 11).map(toDouble) 
    MatchData(id1,id2,mapArray,matches) 
    } 
def toDouble(s: String) = { 
    if ("?".equals(s)) Double.NaN else s.toDouble 
} 
} 
case class MatchData(id1: Int, id2: Int, 
scores: Array[Double], matched: Boolean) 

エラーメッセージMethods.scala:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:335) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:334) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) 

を缶誰もがこれで私を助けてください

答えて

0

class Methods { .. }object Methods { .. }に変更してみてください。

問題はval q =textFile.filter(x => !m.isHeader(x)).map(x=> m.parse(x))にあると思います。 Sparkがfiltermap関数を見ると、渡された関数(x => !m.isHeader(x)x=> m.parse(x))を直列化しようとするので、すべてのエグゼキュータにそれらを実行する作業がディスパッチされます(これは参照されるタスクです)。しかし、これを行うには、mをシリアル化する必要があります。このオブジェクトは関数内で参照されるため(2つの匿名メソッドのクロージャにあります)、Methodsはシリアル化できないため、これを行うことはできません。 Methodsクラスにextends Serializableを追加できますが、この場合はobjectが適切です(そしてすでにシリアライズ可能です)。

+0

ありがとうございました。出来た。 – Pranay

+0

@Pranay Glad私は手伝った!完全性のために、あなたが 'x =>!(new Methods())。isHeader(x)'を使ったとしても、Sparkは定義を取得するために 'メソッド'の全体を直列化しようとするので、 isHeaderのです。 – Alec

関連する問題