2017-06-16 9 views
0

2つのRDDの間で結合方法を使用して、それをcassandraに保存しようとしていますが、コードが機能しません。初めに、私は巨大なメイン​​メソッドとすべてうまく動作しますが、私は関数とクラスを使用すると、これは動作しません。私は、Scalaのに新しいですし、Scalaの直列化可能なエラー結合が機能しない

コードがあるスパーク:スレッドの例外「メイン」org.apache.spark.SparkException:タスクで直列化可能ではない私は有名なを取得beginigで

class Migration extends Serializable { 

    case class userId(offerFamily: String, bp: String, pdl: String) extends Serializable 
    case class siteExternalId(site_external_id: Option[String]) extends Serializable 
    case class profileData(begin_ts: Option[Long], Source: Option[String]) extends Serializable 

    def SparkMigrationProfile(sc: SparkContext) = { 

    val test = sc.cassandraTable[siteExternalId](KEYSPACE,TABLE) 
    .keyBy[userId] 
    .filter(x => x._2.site_external_id != None) 

    val profileRDD = sc.cassandraTable[profileData](KEYSPACE,TABLE) 
    .keyBy[userId] 

    //dont work 
    test.join(profileRDD) 
    .foreach(println) 

    // don't work 
    test.join(profileRDD) 
    .saveToCassandra(keyspace, table) 

    } 

。 。 。 私はメインクラスとケースクラスを拡張しますが、stilは機能しません。

答えて

0

ケースクラスをMigrationクラスから専用ファイルおよび/またはオブジェクトに移動する必要があると思います。これはあなたの問題を解決するはずです。さらに、Scalaのケースクラスはデフォルトで直列化可能です。

+0

私は今、とても愚かになった。 。 。 なぜ私に説明できますか? – user3394825

+0

こんにちは@ user3394825、私はカッサンドラとスパークを使用していないので、言うことは難しいです。私の経験に基づいて、他のクラスで定義されたケースクラスを使用するときにも同様の問題がありました。あなたの状況では、 'cassandraTable'関数のための暗黙のパラメータを作成することに何らかの問題があります(https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/)。 com/datastax/spark/connector/SparkContextFunctions.scala)eg 'rrf:RowReaderFactory [T]、 ev:ValidRDDType [T]'しかし、私はちょうど推測しています。 Spark SQL Encoderを使用するときにも同様の例外があることはわかっています。 –

+0

ケースクラスは、技術的には、Migrationの囲むインスタンスにアクセスできる内部クラスです。それらがシリアル化されると、付随するMigrationオブジェクトもシリアル化されます。また、Serializableとマークされているにもかかわらず、インスタンス変数の中にはおそらくインスタンス変数があります。多くの場合、SparkContextオブジェクトが原因です。 –

関連する問題