2017-03-02 13 views
4

私は、RDDフォーマットのRDD [((ロング、ロング)、(ロング、ロング))]]を持っていて、RDDに変換する必要があります[((ロング、ロング)、 Long、Long、Long))]ここで、第2のRDDタプルは、第1のRDDからの関数に基づく。RDDマップからのスパークスカラ直列化エラー

私はこのベースマップ機能を実現しようとしていますが、私はここで何か間違っていると思います。問題を解決するのを手伝ってください。ここで

は完全なコードです:私はこのスクリプトを実行しようとすると

package com.ranker.correlation.listitem 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.rdd._ 
import scala.collection.Map 

class ListItemCorrelation(sc: SparkContext) extends Serializable { 

    def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = { 
    if (dirX.equals(1)) { 
     if (dirY.equals(1)) { 
     return (1, 0, 0, 0) 
     } else { 
     return (0, 1, 0, 0) 
     } 
    } else { 
     if (dirY.equals(1)) { 
     return (0, 0, 1, 0) 
     } else { 
     return (0, 0, 0, 1) 
     } 
    } 
    } 

    def run(votes: String): RDD[((Long, Long), (Long, Long, Long, Long))] = { 
    val userVotes = sc.textFile(votes) 
    val userVotesPairs = userVotes.map { t => 
     val p = t.split(",") 
     (p(0).toLong, (p(1).toLong, p(2).toLong)) 
    } 
    val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1)) 
    val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2))) 
    var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2))) 
    //More functionality 
    return result 
    } 

} 
object ListItemCorrelation extends Serializable { 
    def main(args: Array[String]) { 
    val votes = args(0) 
    val conf = new SparkConf().setAppName("SparkJoins").setMaster("local") 
    val context = new SparkContext(conf) 
    val job = new ListItemCorrelation(context) 
    val results = job.run(votes) 
    val output = args(1) 
    results.saveAsTextFile(output) 
    context.stop() 
    } 
} 

私は、次のエラーを取得しています:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at com.ranker.correlation.listitem.ListItemCorrelation.run(ListItemCorrelation.scala:34) at com.ranker.correlation.listitem.ListItemCorrelation$.main(ListItemCorrelation.scala:47) at com.ranker.correlation.listitem.ListItemCorrelation.main(ListItemCorrelation.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: [email protected]) - field (class: com.ranker.correlation.listitem.ListItemCorrelation, name: sc, type: class org.apache.spark.SparkContext) - object (class com.ranker.correlation.listitem.ListItemCorrelation, [email protected]) - field (class: com.ranker.correlation.listitem.ListItemCorrelation$$anonfun$4, name: $outer, type: class com.ranker.correlation.listitem.ListItemCorrelation) - object (class com.ranker.correlation.listitem.ListItemCorrelation$$anonfun$4, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 12 more

このエラーが起こって次の行を実行しながら:

var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2)))

私はスカラーにとって非常に新しく、これを行う正しい方法を見つけるのを手伝ってください。

+0

「実行」の引数として「SparkContext」を押しますか?現在の実装では、すべての方法でドラッグされ、シリアル化ではないため、エラーが発生します。 – zero323

+0

@geek:これには解決策がありましたか? – arun

答えて

0

up_downメソッドをコンパニオンオブジェクトに配置します。 RDDクロージャ内でクラス変数にアクセスすると、そのクラス(およびその中のすべて(SparkContextなど))がシリアル化されます。メソッドパラメータはここでクラス変数としてカウントされます。静的オブジェクトを使用すると、これを回避できます。

package com.ranker.correlation.listitem 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.rdd._ 
import scala.collection.Map 

object ListItemCorrelation { 
    def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = { 
    if (dirX.equals(1)) { 
     if (dirY.equals(1)) { 
     return (1, 0, 0, 0) 
     } else { 
     return (0, 1, 0, 0) 
     } 
    } else { 
     if (dirY.equals(1)) { 
     return (0, 0, 1, 0) 
     } else { 
     return (0, 0, 0, 1) 
     } 
    } 
    } 
} 


class ListItemCorrelation(sc: SparkContext) extends Serializable { 

    def run(votes: String): RDD[((Long, Long), (Long, Long, Long, Long))] = { 
    val userVotes = sc.textFile(votes) 
    val userVotesPairs = userVotes.map { t => 
     val p = t.split(",") 
     (p(0).toLong, (p(1).toLong, p(2).toLong)) 
    } 
    val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1)) 
    val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2))) 
    var second = first.map(t => ((t._1._1, t._2._1), ListItemCorrelation.up_down(t._1._2, t._2._2))) 
    //More functionality 
    return result 
    } 

} 
object ListItemCorrelation extends Serializable { 
    def main(args: Array[String]) { 
    val votes = args(0) 
    val conf = new SparkConf().setAppName("SparkJoins").setMaster("local") 
    val context = new SparkContext(conf) 
    val job = new ListItemCorrelation(context) 
    val results = job.run(votes) 
    val output = args(1) 
    results.saveAsTextFile(output) 
    context.stop() 
    } 
} 
関連する問題