2017-04-20 17 views
0

hbaseにストリーミングデータを挿入したい。 これは私のコードです:スパークストリーミングhbaseエラー

val tableName = "streamingz" 
val conf = HBaseConfiguration.create() 
conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/etc/hbase/conf.dist/hbase-site.xml")) 
conf.set(TableInputFormat.INPUT_TABLE, tableName) 

val admin = new HBaseAdmin(conf) 
if (!admin.isTableAvailable(tableName)) { 
    print("-----------------------------------------------------------------------------------------------------------") 
    val tableDesc = new HTableDescriptor(tableName) 
    tableDesc.addFamily(new HColumnDescriptor("z1".getBytes())) 
    tableDesc.addFamily(new HColumnDescriptor("z2".getBytes())) 
    admin.createTable(tableDesc) 
} else { 
    print("Table already exists!!--------------------------------------------------------------------------------------") 
} 
val ssc = new StreamingContext(sc, Seconds(10)) 
val topicSet = Set("fluxAstellia") 
val kafkaParams = Map[String, String]("metadata.broker.list" - > "10.32.201.90:9092") 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
val lines = stream.map(_._2).map(_.split(" ", -1)).foreachRDD(rdd => { 
    if (!rdd.partitions.isEmpty) { 
     val myTable = new HTable(conf, tableName) 
     rdd.map(rec => { 
      var put = new Put(rec._1.getBytes) 
      put.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(rec._2)) 
      myTable.put(put) 
     }).saveAsNewAPIHadoopDataset(conf) 
     myTable.flushCommits() 
    } else { 
     println("rdd is empty") 
    } 

}) 


ssc.start() 
ssc.awaitTermination() 

} 
} 

私はこのエラーを得た:

:66: error: value _1 is not a member of Array[String] 
     var put = new Put(rec._1.getBytes) 

私はこのエラーを修正することはできませんので、どのように初心者だ、と私は疑問があります。

テーブルを正確に作成します。ストリーミングプロセスの外側か内部か?

ありがとう

答えて

0

あなたのエラーはあなただけ(値のキーと_2用_1)、地図やタプルに_n呼び出すことができますラインvar put = new Put(rec._1.getBytes) に基本的です。
recは、ストリーム内の文字列をスペース文字で分割した文字列配列です。あなたが最初の要素の後にいたなら、var put = new Put(rec(0).getBytes)と書いてください。同様に、次の行では、put.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(rec(1)))

+0

と書いて、hbaseテーブルを作成してみましょう。どこでそれを作りますか? –

+0

私はこの新しいエラーの兄弟を取得しました 'ERROR JobScheduler:ジョブストリーミングジョブを実行中にエラーが発生しました1492790490000 ms.0 org.apache.spark.SparkException:タスクがシリアライズ不能です' –

+0

スタックトレースでどのクラスがシリアライズできないかを教えてください。あなたのmap()クロージャにあるものは、シリアライズ可能でなければなりません。私の推測では、HTableは直列化できません。 '' myTable = new HTable(conf、tableName)''をjava.io.Serializable'に置き換えるか、 '@transient lazy'としてマークすることで、シリアライズ可能にすることができます。やってみたいです。 – sparker

関連する問題