0
にあります。スパークストリーミングを使用してAPI saveAsHadoopDataset(jobConf)
でHbaseにデータを書き込むと、警告ログが表示されます。 ?ここ は私のコードです:WARN FileOutputCommitter:出力パスがsetupJob()で
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(args(0)).setMaster("local[4]")
val ssc = new StreamingContext(conf, Seconds(1))
val wordcountDStream: DStream[(String, Int)] =
ssc.socketTextStream(args(1), args(2).toInt).flatMap(x => x.split(" "))
.map((_, 1))
.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(10), Seconds(3))
var jobConf = new JobConf(HBaseConfiguration.create())
jobConf.set("hbase.zookeeper.quorum", args(3))
jobConf.set("zookeeper.znode.parent", "/hbase")
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "SparkWordCount")
val resultDstream = wordcountDStream.repartition(1)
.foreachRDD(rdd => {
rdd.sortBy(_._2, false).zipWithUniqueId().filter(_._2 < 5).map(triple => {
var put = new Put(Bytes.toBytes((triple._2 + 1).toString))
put.addColumn(Bytes.toBytes("result"), Bytes.toBytes("word"), Bytes.toBytes(triple._1._1))
put.addColumn(Bytes.toBytes("result"), Bytes.toBytes("count"), Bytes.toBytes(triple._1._2.toString))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
})
ssc.start()
ssc.awaitTermination()
と私のコンソールログ:!?
17/06/20 22:06:15 WARN FileOutputCommitter: Output Path is null in setupJob()
17/06/20 22:06:24 WARN FileOutputCommitter: Output Path is null in commitJob()
はあまり
と私のHBaseの – Gpwner