2017-10-16 21 views
1

java.io.NotSerializableExceptionが発生します。java.io.NotSerializableExceptionが発生すると、sparkでデータを処理するときに問題が発生します。それはjava.io.NotSerializableExceptionの原因を失敗し、従わしかしSparkストリーミングを使用してrddをHbaseに保存すると、

val hbase_conf = HBaseConfiguration.create() 
hbase_conf.set("hbase.zookeeper.property.clientPort", "2181") 
hbase_conf.set("hbase.zookeeper.quorum", "hadoop-zk0.s.qima-inc.com,hadoop-zk1.s.qima-inc.com,hadoop-zk2.s.qima-inc.com") 
val newAPIJobConfiguration = Job.getInstance(hbase_conf); 
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table"); 
newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]]) 
newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp") 
mydata.foreachRDD(rdd => { 
    val json_rdd = rdd.map(Json.parse _).map(_.validate[Scan]) 
    .map(Scan.transformScanRestult _) 
    .filter(_.nonEmpty) 
    .map(_.get) 
    .map(Scan.convertForHbase _) 
    json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration) 
}) 

object mytest_config{ 
    val hbase_conf = HBaseConfiguration.create() 
    hbase_conf.set("hbase.zookeeper.property.clientPort", "2181") 
    hbase_conf.set("hbase.zookeeper.quorum", "zk1,zk2") 
    val newAPIJobConfiguration = Job.getInstance(hbase_conf); 
    newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "mytest_table"); 
    newAPIJobConfiguration.setOutputFormatClass(classOf[org.apache.hadoop.hbase.mapreduce.TableOutputFormat[ImmutableBytesWritable]]) 
    newAPIJobConfiguration.getConfiguration().set("mapreduce.output.fileoutputformat.outputdir", "/tmp") 
    } 

mydata.foreachRDD(rdd => { 
     val json_rdd = rdd.map(Json.parse _) 
     .map(_.validate[Scan]) 
     .map(Scan.transformScanRestult _) 
     .filter(_.nonEmpty) 
     .map(_.get) 
     .map(Scan.convertForHbase _) 

    json_rdd.saveAsNewAPIHadoopDataset(mytest_config.newAPIJobConfiguration.getConfiguration) 
}) 

そして、これは仕事ができると、次のように私は私のコードを変更するエラー情報

17/10/16 18:56:50 ERROR Utils: Exception encountered 
     java.io.NotSerializableException: org.apache.hadoop.mapreduce.Job 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 

です! 誰かがこの作品の理由を知り、公式に推奨される方法は何ですか?

答えて

2

というエラーが原因

このnewAPIJobConfigurationにたぶん私は非常に明確に私の質問を説明していなかった内部の労働者(foreach

json_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration) 
+0

を使っているドライバー

val newAPIJobConfiguration = Job.getInstance(hbase_conf); 

で初期化されました。私は、なぜこの例ではmytest-configが機能するかなど、すべての設定をscalaオブジェクトに入れるのか知りたいのです。オブジェクトがドライバで起動されたかどうか –

+0

これは 'mytest_config'が' serializable'であり、他のものが – mrsrinivas

+0

ではありません。オブジェクトがドライバから作業者にシリアル化不可能なオブジェクトを変換することを可能にすることができます。大いに感謝する。 lazy valを使用する必要があるときにもう1つ質問があります。例を挙げてください。 –

関連する問題