2016-09-20 14 views
2

私は、foreachpartitionをmysql thorughに追加しようとしていますが、エラーを取得しようとしています。org.apache.spark.SparkException:org.apache.spark.SparkException:タスクがシリアライズ可能でないjava

パブリッククラス挿入は、MySQLの結果を更新しようとしている間、私は、エラー

6/09/20 12:37:58 INFO SparkContext: Created broadcast 0 from textFile at Insert.java:41 
org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:919) 
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) 
    at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46) 
    at final_file.Insert.main(Insert.java:59) 
Caused by: java.io.NotSerializableException: java.lang.Object 
Serialization stack: 
    - object not serializable (class: java.lang.Object, value: [email protected]) 
    - writeObject data (class: java.util.HashMap) 
    - object (class java.util.HashMap, {[email protected], [email protected], [email protected]}) 
    - field (class: com.mysql.jdbc.ConnectionImpl, name: charsetConverterMap, type: interface java.util.Map) 
    - object (class com.mysql.jdbc.JDBC4Connection, [email protected]) 
    - field (class: final_file.Insert$2, name: conn, type: interface com.mysql.jdbc.Connection) 
    - object (class final_file.Insert$2, [email protected]) 
    - field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1, name: f$12, type: interface org.apache.spark.api.java.function.VoidFunction) 
    - object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
    ... 12 more 

私は上記のエラーを取得しています下に取得していますSerializableを{

transient static JavaSparkContext spc; 
public static void main(String gg[]) 
{ 

Map<String, String> options = new HashMap<String, String>(); 
     options.put("url","jdbc:mysql://localhost:3306/testing?user=root&password=pwd"); 
     options.put("dbtable", "rtl"); 
SparkConf ss=new SparkConf().setAppName("insert").setMaster("local"); 

spc=new JavaSparkContext(ss); 

    JavaRDD<String> rbm=spc.textFile(path); 
    // DataFrame jdbcDF = sqlContext.jdbc(options.get("url"),options.get("dbtable")); 

    // System.out.println("Data------------------->" + jdbcDF.toJSON().first()); 


JavaRDD<String> file=rbm.flatMap(new FlatMapFunction<String, String>() { 
NotSerializableException nn=new NotSerializableException(); 
    public Iterable<String> call(String x) { 
     // TODO Auto-generated method stub 

     return Arrays.asList(x.split(" ")[0]); 
    } 
}); 



try { 
    file.foreachPartition(new VoidFunction<Iterator<String>>() { 
    Connection conn= (Connection) DriverManager.getConnection("jdbc:mysql://localhost/testing","root","[email protected]"); 

     PreparedStatement del = (PreparedStatement) conn.prepareStatement ("INSERT INTO rtl (rtl_s) VALUES (?) "); 
     NotSerializableException nn=new NotSerializableException(); 
      public void call(Iterator<String> x) throws Exception { 
       // TODO Auto-generated method stub 
    while(x.hasNext()) 
    { 
       String y=x.toString(); 
       del.setString(1, y); 
       del.executeUpdate(); 
    } 
      } 

    }); 
} catch (Exception e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
} 

を実装しています。

+0

「DriverManager」には何が含まれていますか?それはシリアル化できないようです。 –

+0

実際には、mysqlのプロパティが含まれています。それはユーザー名とパスワードとdb名を持っています。 – Aman

答えて

0
> The issue has been resolved after changing in try block. 
> 
> try { 
>  file.foreachPartition(new VoidFunction<Iterator<String>>() { 
> 
>   NotSerializableException nn=new NotSerializableException(); 
>    public void call(Iterator<String> x) throws Exception { 
>     // TODO Auto-generated method stub 
>  Connection conn= (Connection) DriverManager.getConnection("jdbc:mysql://localhost/testing","root","[email protected]"); 
> 
>   PreparedStatement del = (PreparedStatement) conn.prepareStatement ("INSERT INTO rtl (rtl_s) VALUES (?) "); 
>  while(x.hasNext()) 
>  { 
>     String y=x.toString(); 
>     del.setString(1, y); 
>     del.executeUpdate(); 
>  } 
>    } 
> 
>  }); } catch (Exception e) { 
>  // TODO Auto-generated catch block 
>  e.printStackTrace(); } 
+0

http://stackoverflow.com/questions/40818001/understanding-spark-serialization/40818002?sfb=2#40818002スパークシリアル化を理解するための概要 – KrazyGautam

関連する問題