0
hbase
にストリーミングデータを挿入したい。 これは私のコードです:スパークストリーミングhbaseエラー
val tableName = "streamingz"
val conf = HBaseConfiguration.create()
conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/etc/hbase/conf.dist/hbase-site.xml"))
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
print("-----------------------------------------------------------------------------------------------------------")
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
tableDesc.addFamily(new HColumnDescriptor("z2".getBytes()))
admin.createTable(tableDesc)
} else {
print("Table already exists!!--------------------------------------------------------------------------------------")
}
val ssc = new StreamingContext(sc, Seconds(10))
val topicSet = Set("fluxAstellia")
val kafkaParams = Map[String, String]("metadata.broker.list" - > "10.32.201.90:9092")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val lines = stream.map(_._2).map(_.split(" ", -1)).foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
val myTable = new HTable(conf, tableName)
rdd.map(rec => {
var put = new Put(rec._1.getBytes)
put.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(rec._2))
myTable.put(put)
}).saveAsNewAPIHadoopDataset(conf)
myTable.flushCommits()
} else {
println("rdd is empty")
}
})
ssc.start()
ssc.awaitTermination()
}
}
私はこのエラーを得た:
:66: error: value _1 is not a member of Array[String]
var put = new Put(rec._1.getBytes)
私はこのエラーを修正することはできませんので、どのように初心者だ、と私は疑問があります。
テーブルを正確に作成します。ストリーミングプロセスの外側か内部か?
ありがとう
と書いて、hbaseテーブルを作成してみましょう。どこでそれを作りますか? –
私はこの新しいエラーの兄弟を取得しました 'ERROR JobScheduler:ジョブストリーミングジョブを実行中にエラーが発生しました1492790490000 ms.0 org.apache.spark.SparkException:タスクがシリアライズ不能です' –
スタックトレースでどのクラスがシリアライズできないかを教えてください。あなたのmap()クロージャにあるものは、シリアライズ可能でなければなりません。私の推測では、HTableは直列化できません。 '' myTable = new HTable(conf、tableName)''をjava.io.Serializable'に置き換えるか、 '@transient lazy'としてマークすることで、シリアライズ可能にすることができます。やってみたいです。 – sparker