2016-10-22 5 views
2

スカラを使用してSparkRDDをHBaseテーブルに書き込もうとしています(以前は使用されていませんでした)。全体のコードはこれです:Scalaを使用してSpaseRDDをHBaseテーブルに書き込む

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} 
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} 
import org.apache.hadoop.hbase.mapreduce.TableInputFormat 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
import scala.collection.JavaConverters._ 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.spark._ 
import org.apache.hadoop.mapred.JobConf 
import org.apache.spark.rdd.PairRDDFunctions 
import org.apache.spark.SparkContext._ 
import org.apache.hadoop.mapred.Partitioner; 
import org.apache.hadoop.hbase.mapred.TableOutputFormat 
import org.apache.hadoop.hbase.client._ 

object HBaseWrite { 
    def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("HBaseWrite").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    val sc = new SparkContext(sparkConf) 
    val conf = HBaseConfiguration.create() 
    val outputTable = "tablename" 

    System.setProperty("user.name", "hdfs") 
    System.setProperty("HADOOP_USER_NAME", "hdfs") 
    conf.set("hbase.master", "localhost:60000") 
    conf.setInt("timeout", 120000) 
    conf.set("hbase.zookeeper.quorum", "localhost") 
    conf.set("zookeeper.znode.parent", "/hbase-unsecure") 
    conf.setInt("hbase.client.scanner.caching", 10000) 
    sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result])) 
    val jobConfig: JobConf = new JobConf(conf,this.getClass) 
    jobConfig.setOutputFormat(classOf[TableOutputFormat]) 
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE,outputTable) 
    val x = 12 
    val y = 15 
    val z = 25 
    var newarray = Array(x,y,z) 
    val newrddtohbase = sc.parallelize(newarray) 
    def convert(a:Int) : Tuple2[ImmutableBytesWritable,Put] = { 
      val p = new Put(Bytes.toBytes(a)) 
      p.add(Bytes.toBytes("columnfamily"), 
      Bytes.toBytes("col_1"), Bytes.toBytes(a)) 
      new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p); 
    } 
    new PairRDDFunctions(newrddtohbase.map(convert)).saveAsHadoopDataset(jobConfig) 
    sc.stop() 
    } 
} 

私はHBaseWrite(メイン(Array()を)行った後に取得エラーはこれです:?私はそれを成し遂げるために

+0

あなたの 'convert'メソッドを' map'メソッドの 'function literal'として渡して、問題を解決します。 – Shankar

答えて

0

例えば、以下の方法はあなたがtoDouble(2)を使用することができますし、それはあなたがあなたの方法は以下のように関数リテラルに変換することができます同じよう2.0

を返しダブル

var toDouble: (Int) => Double = a => { 
    a.toDouble 
} 

を引数としてのIntを取り、返します。

+0

ダウン有権者、来る...いくつかのコメントを追加.. – Shankar

+0

私はなぜ誰かがこれをdownvoteだろうかと思う。それは私のためにエラーを削除しました。 –

2

進めるにはどうすればよい

org.apache.spark.SparkException: Task not serializable 

あなたはこのようにこのコードを記述する場合は、ここで間違ってやっていることは、それが動作する可能性があり convert mainの内側を定義している:

object HBaseWrite { 
     def main(args: Array[String]) { 
     val sparkConf = new SparkConf().setAppName("HBaseWrite").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     val sc = new SparkContext(sparkConf) 
     val conf = HBaseConfiguration.create() 
     val outputTable = "tablename" 

     System.setProperty("user.name", "hdfs") 
     System.setProperty("HADOOP_USER_NAME", "hdfs") 
     conf.set("hbase.master", "localhost:60000") 
     conf.setInt("timeout", 120000) 
     conf.set("hbase.zookeeper.quorum", "localhost") 
     conf.set("zookeeper.znode.parent", "/hbase-unsecure") 
     conf.setInt("hbase.client.scanner.caching", 10000) 
     sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result])) 
     val jobConfig: JobConf = new JobConf(conf,this.getClass) 
     jobConfig.setOutputFormat(classOf[TableOutputFormat]) 
     jobConfig.set(TableOutputFormat.OUTPUT_TABLE,outputTable) 
     val x = 12 
     val y = 15 
     val z = 25 
     var newarray = Array(x,y,z) 
     val newrddtohbase = sc.parallelize(newarray) 
     val convertFunc = convert _ 
     new PairRDDFunctions(newrddtohbase.map(convertFunc)).saveAsHadoopDataset(jobConfig) 
     sc.stop() 
     } 
     def convert(a:Int) : Tuple2[ImmutableBytesWritable,Put] = { 
       val p = new Put(Bytes.toBytes(a)) 
       p.add(Bytes.toBytes("columnfamily"), 
       Bytes.toBytes("col_1"), Bytes.toBytes(a)) 
       new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p); 
     } 
    } 

P .:コードはテストされていませんが、動作するはずです。

+0

ご回答ありがとうございますが、エラーは同じです。 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166)でタスク直列化可能ではない \t: \t org.apacheで –

+0

はあなたのエラーStackkすぎ –

+0

org.apache.spark.SparkExceptionを貼り付けてくださいすることができます。 spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:158) \t at org.apache.spark.SparkContext.clean(SparkContext.scala:1446) \t at org.apache.spark.rdd.RDD.map(RDD .scala:286) –

関連する問題