2016-08-23 8 views
0

Spark、Scala、Cassandraの新機能です。 Sparkの使用MySQLからいくつかのIDを取得しようとしています。スパークシェル:タスクがシリアル化されない

import org.apache.spark.rdd.JdbcRDD 
import java.sql.{Connection, DriverManager, ResultSet} 
Class.forName("com.mysql.jdbc.Driver").newInstance 

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf 

val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ; 
myRDD.foreach(println) 

私はコンソールに印刷されたIDを見ることができます。

今、取り出したIDごとに、私はCassandraのテーブルで合計演算を行う必要があります。

は、私が個人ID

object HelloWorld { 
     def sum(id : String): Unit = { 
     val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum 
     println(each_spark_rdd) 
     } 
    } 

を渡して呼び出すことができる午前機能を作成し、

val uplink_rdd = sc.cassandraTable("keyspace", "table") 

私は個人IDを渡すことによって機能を呼び出すことができるだと見ることができるようにuplink_rdd宣言しました合計

いつ私が各fetcで同じ関数を実行しようとしているかタスク org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleanerで直列化可能ではない:例外

org.apache.spark.SparkExceptionと同じ例外を与える

myRDD.map(HelloWorld.sum) 
or 
myRDD.foreach(HelloWorld.sum) 
or 
for (id <- myRDD) HelloWorld.sum(id) 

そのよう時間ID。スカラ:304) でorg.apache.spark.util.ClosureCleanerの$の.org $ Apacheの$スパーク$ UTIL $ ClosureCleaner $$クリーン(ClosureCleaner.scala:294) で org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clea n(SparkContext.scala:2055)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:911)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:910)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala :111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) org.apache.spark.rdd.RDD.foreach(RDD.scala:910) $ iwC $$ iwC $:$$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:54) at $ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:59) $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC (:63) $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC(:63) $ iwC $$ iwC $$ (:67) $ iwC $$ iwC $$ iwC $$ iwC $$ iwC。:(:67) $ iwC $$ iwC $$ iwC $$ (:69) $ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:71) $ iwC $$ iwCに$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。 (:77) $ iwC $$ iwC $$ iwC(:75)$ iwC $$ iwC(:73)$ iwC $$ iwC。(:77) $ iwC。(:79)at(:81) ) (時:85)で、() で(:7)で()の$プリント() で sun.reflect.NativeMethodAccessorImpl.invokeでsun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド)で。 (NativeMethodAccessorImpl.java:62) at sun.reflect.Delegating MethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.repl.SparkIMain $ ReadEvalPrint.call(SparkIMain。スカラ:1065)と org.apache.spark.repl.SparkIMain $ Request.loadAndRun(SparkIMain.scala:1346)と org.apache.spark.repl.SparkIMain.loadAndRunReq $ -1(SparkIMain.scala:840) そしてorg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)と org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)と org.apache.spark.repl。 SparkILoop.reallyInterpret $ -1(SparkILoop.scala:857)と org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala 902) とorg.apache.spark.repl.SparkILoop.command(SparkILoop.scala: 814)と org.apache.spark.repl.SparkILoop.processLine $ -1(SparkILoop.scala:657) とorg.apache.spark.repl.SparkILoop.innerLoop $ -1(SparkILoop.scala:665)と org.apache.spark.repl.SparkILoop.orgアパッチ$ $ $スパークREPL $ $$ SparkILoopループ(SparkILoop.scala :670)と org.apache.spark.repl.SparkILoop $ $$ anonfun ORGアパッチ$する$ $スパークREPL $ $$ SparkILoopプロセス1.Apply $ $ $ MCZ属(SparkILoop.scala:997)と ORG。 apache.spark.repl.SparkILoop $ $$ anonfun組織のapacheの$ $の$スパークREPL $ $$ SparkILoopプロセス1.Apply $(SparkILoop.scala:945)と org.apache.spark.repl.SparkILoop anonfun $ $$組織apacheの$ $ $スパークのREPL $ $$ SparkILoopプロセス1.Apply $(SparkILoop.scala:945)と scala.tools.nsc.util.ScalaClassLoader $ .savingContextLoader(ScalaClassLoader.scala:135)と org.apache.spark.repl.SparkILoop.orgアパッチ$ $ $スパークREPL $ $$ SparkILoopプロセス(SparkILoop.scala 945) とorg.apache.spark.repl.SparkILoop.process(1059 SparkILoop.scala) とorg.apache.spark.repl.Main $ .main(Main.scala 31)と org.apache.spark.repl.Main.main(Main.scala)と sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブ法) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) とsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) とjava.lang.reflect.Method.invoke(Method.java :498)と org.apache.spark.deploy.SparkSubmit $の.orgのapacheの$ $の$スパーク展開SparkSubmit $ $$ runMain(SparkSubmit.scala:731) と org.apache.spark.deploy.SparkSubmit .doRunMain $ 1 $(SparkSubmit.scala 181) とorg.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206)及び org.apache。 spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121)と によって引き起こさ org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala):java.io.NotSerializableException:org.apache.spark.SparkConf私は

@transient val myRDD = new JdbcRDD ... 
@transient val uplink_rdd = sc.cassandra.... 

それでもgとApache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manuallyを読んだ後RDDSに@Transientを追加しようとした

同じエラーをetting。

私は、私は、MySQLからフェッチされたIDごとにCassandaraテーブルから合計を見つけることができる方法を教えてください。

+0

よう

何か問題は、本質的に変革内部アクションを実行しようとしているということです。 'foreach'を呼び出すと、Sparkは' HelloWorld.sum'をシリアライズしてエグゼキュータのそれぞれに渡しますが、そうするためには関数のクロージャもシリアル化する必要があります。これには 'uplink_rdd'が含まれています(シリアル化できません)。 しかし、あなたがこのようなことをしようとしているとき、通常は、単に「join」などを使用したいと思っているだけです。 – Alec

+0

これをチェックしましたか? [link](https://stackoverflow.com/questions/32661018/scala-spark-task-not-serializable?rq=1) – Cfuentes

答えて

0

あなたのコードはmyRDDに変換内uplink_rddを使用しようとしています。 RDDに適用される閉鎖は別のRDDを含めることはできません。

あなたは代わりに(LY?)並列・分散してカサンドラからデータを取得するためにmyRDDからの情報を使用しますjoinWithCassandraTableの線に沿って何かをやるべき。これは、あなたがカサンドラ

から単一パーティション・キーを引いている場合は動作しますthe Docs

に別のオプションを参照してください。コネクターが使用するプールから描画手動接続を使用することです。

val cc = CassandraConnector(sc.getConf) 
myRDD.mapPartitions { it => 
    cc.withSessionDo { session => 
    session.execute("whatever query you want") 
    } 
} 

あなたが実際にあなたが に必要カサンドラに複数のパーティションを合計するしている場合は、各IDのための新しいRDDを作ります。スパークでの変換とアクションが入れ子にすることはできません -

myRDD.collect.foreach(HelloWorld.sum) 
関連する問題