0
2つのRDDの間で結合方法を使用して、それをcassandraに保存しようとしていますが、コードが機能しません。初めに、私は巨大なメインメソッドとすべてうまく動作しますが、私は関数とクラスを使用すると、これは動作しません。私は、Scalaのに新しいですし、Scalaの直列化可能なエラー結合が機能しない
コードがあるスパーク:スレッドの例外「メイン」org.apache.spark.SparkException:タスクで直列化可能ではない私は有名なを取得beginigで
class Migration extends Serializable {
case class userId(offerFamily: String, bp: String, pdl: String) extends Serializable
case class siteExternalId(site_external_id: Option[String]) extends Serializable
case class profileData(begin_ts: Option[Long], Source: Option[String]) extends Serializable
def SparkMigrationProfile(sc: SparkContext) = {
val test = sc.cassandraTable[siteExternalId](KEYSPACE,TABLE)
.keyBy[userId]
.filter(x => x._2.site_external_id != None)
val profileRDD = sc.cassandraTable[profileData](KEYSPACE,TABLE)
.keyBy[userId]
//dont work
test.join(profileRDD)
.foreach(println)
// don't work
test.join(profileRDD)
.saveToCassandra(keyspace, table)
}
。 。 。 私はメインクラスとケースクラスを拡張しますが、stilは機能しません。
私は今、とても愚かになった。 。 。 なぜ私に説明できますか? – user3394825
こんにちは@ user3394825、私はカッサンドラとスパークを使用していないので、言うことは難しいです。私の経験に基づいて、他のクラスで定義されたケースクラスを使用するときにも同様の問題がありました。あなたの状況では、 'cassandraTable'関数のための暗黙のパラメータを作成することに何らかの問題があります(https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/)。 com/datastax/spark/connector/SparkContextFunctions.scala)eg 'rrf:RowReaderFactory [T]、 ev:ValidRDDType [T]'しかし、私はちょうど推測しています。 Spark SQL Encoderを使用するときにも同様の例外があることはわかっています。 –
ケースクラスは、技術的には、Migrationの囲むインスタンスにアクセスできる内部クラスです。それらがシリアル化されると、付随するMigrationオブジェクトもシリアル化されます。また、Serializableとマークされているにもかかわらず、インスタンス変数の中にはおそらくインスタンス変数があります。多くの場合、SparkContextオブジェクトが原因です。 –