2017-11-03 4 views
1

私は、JDBCを使用して点火するためのデータフレームを書き込もうと、は書き込めません/スパークから直接点火するためにデータを保存RDD

スパークバージョンがある:2.1

のIgniteバージョン:2.3

JDK:1.8

スカラ:2.11.8

が、これは私のコードスニペットです:

org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleanerで タスクはシリアライズない:

org.apache.spark.SparkException:

、その後、私は火花で実行するには、それがERROメッセージを印刷します。スカラー:298) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner $ .clean (ClarksCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:924 ) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151) at org .apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD。 foreachPartition(RDD.scala:923) at org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply $ mcV $ sp(Dataset.scala:2305) at org.apache.spark.sql.Dataset $ $ anonfun $ foreachPartition $ 1.apply(Dataset.scala:2305) (org.apache.spark.sql.Dataset)$$ anonfun $ foreachPartition $ 1.apply(Dataset.scala:2305) at org.apache.spark.sql。 execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2304) at com.pingan.pilot.ignite。 common.OperationIgniteUtil $ .WriteToIgnite(OperationIgniteUtil.scala:72) com.pingan.pilot.ignite.etl.HdfsToIgnite $ .mainで(HdfsToIgnite.scala:36)com.pingan.pilot.ignite.etl.HdfsToIgniteで 。 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43でsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) でsun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド) における主(HdfsToIgnite.scala) ) at java.lang.reflect.Method.invoke(Method.java:498) at org。 apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:126) at org.apache.spark。デリバリーされていないオブジェクト(クラス:org.apache.ignite.internal)。 jdbc2。OperationIgniteUtil $$ anonfun $ WriteToIgnite $ 1、 名前:CONN $ 1、タイプ:インタフェースjava.sql.Connection) - オブジェクト(クラスcom.pingan.pilot.ignite.common.OperationIgniteUtil $$ anonfun $ WriteToIgnite $ 1、 ) でorg.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance .serialize(JavaSerializer.scala:100)org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:295)で ...もっと27

誰か私はそれを修正するか? ありがとう!

+0

これが役立ちます。https://stackoverflow.com/questions/43592742/spark-scala-task-not-serializable-error –

答えて

0

Serializableインターフェイスを拡張する必要があります。

object Test extends Serializable { 
    def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = { 
    ??? 
    } 
} 

あなたの問題を解決することを願っています。

2

ここでの問題点は、Ignite DataSource.connへの接続をシリアル化できないことです。 forEachPartitionに提供しているクロージャには、スコープの一部としての接続が含まれています。そのため、Sparkはそれをシリアル化できません。

Igniteには、値を保存するためのRDDのカスタム実装が用意されています。あなたはIgniteContextまず、その後、あなたのRDDのを保存するために点火するために分散アクセスを提供のIgniteの共有RDDを取得作成する必要があります:

val igniteContext = new IgniteContext(sparkContext,() => new IgniteConfiguration()) 
... 

// Retrieve Ignite's shared RDD 
val igniteRdd = igniteContext.fromCache("partitioned") 
igniteRDD.saveValues(hiveDF.toRDD) 

詳しい情報をApache Ignite documentationからアクセスできます。

関連する問題