私はスパークストリーミングでJSONArrayというデータをストリーミングしようとしています。各JSONArrayにはいくつかのJSONObjectが含まれています。JSONArrayから各JSONオブジェクトを抽出し、スパークストリーミングでcassandraに保存する方法
それぞれのJSONObjectをデータフレームに保存し、もう一方のテーブルとマッピングした後でcassandraテーブルに保存したいとします。
私はJSONObjectを保存するためにdataframeを作成しようとしましたが、stream.foreachRDDでdataframeを作成すると、NullPointerExceptionがスローされます。スパークはネスト化されたRDDをサポートしていないためですか?もしそうなら、どうすればJSONObjectをcassandraに保存できますか?
データフォーマットは以下の通りです:
[
{
"temperature":"21.8",
"humidity":"65.6",
"creatime":"2016-11-14 13:50:24",
"id":"3303136",
"msgtype":"th",
"sensorID":"001"
},
{
"temperature":"23.1",
"humidity":"60.6",
"creatime":"2016-11-14 13:50:24",
"id":"3303137",
"msgtype":"th",
"sensorID":"002"
}
]
マイコード:
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import com.datastax.spark.connector.mapper.DefaultColumnMapper
import com.datastax.spark.connector._
import org.apache.spark.SparkConf
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import net.sf.json.JSONObject
import net.sf.json.JSONArray
object getkafkadata {
def main(args: Array[String]) {
val cassandraHostIP = "10.2.1.67"
val keyspaceToGet = "iot_test"
val conf = new SparkConf()
.setMaster("local")
.setAppName("PageViewStream")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.cassandra.connection.host", cassandraHostIP)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
val sqc = new SQLContext(sc)
val sqlContext = SQLContextSingleton.getInstance(sc)
import sqlContext.implicits._
val cc = new CassandraSQLContext(sc)
cc.setKeyspace(keyspaceToGet)
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "10.2.1.67:6667",
"group.id" -> "a13",
"auto.offset.reset" -> "smallest")
val topics = Set("test1208")
println("kafkaParams=", kafkaParams, "topics=", topics)
val offsetsList = 0
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
println("Line3 good!")
println("Start to parse json...")
val datas = stream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(line => {
val event = JSONArray.fromObject(line._2)
for (n <- 0 to event.size() - 1) {
val eventobj = event.getJSONObject(n)
println("======= Message =======")
println(eventobj.toString())
//data lost exception handling
var sensorID = "no_data"
var humidity = "0"
var temperature = "0"
var msgtype = "no_data"
var creatime = "0"
var id = "no_data"
if (eventobj.has("sensorID"))
sensorID = eventobj.getString("sensorID")
if (eventobj.has("humidity"))
humidity = eventobj.getString("humidity")
if (eventobj.has("temperature"))
temperature = eventobj.getString("temperature")
if (eventobj.has("msgtype"))
msgtype = eventobj.getString("msgtype")
if (eventobj.has("creatime"))
creatime = eventobj.getString("creatime")
if (eventobj.has("id"))
id = eventobj.getString("id")
var df = cc.createDataFrame(Seq(
(sensorID, humidity, temperature, msgtype, creatime, id)))
.toDF("sensorID", "humidity", "temperature", "msgtype", "creatime", "id")
println("==========df create done=========")
df.show()
}
})
})
})
ssc.start()
ssc.awaitTermination()
}
例外メッセージ:
16/12/12 09:28:35 ERROR JobScheduler: Error running job streaming job 1481506110000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:75)
at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:74)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
... 3 more
16/12/12 09:28:35 INFO DAGScheduler: ResultStage 1 (foreachPartition at getkafkadata.scala:75) finished in 0.063 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:75)
at com.test.spark.mapping.getkafkadata$$anonfun$1.apply(getkafkadata.scala:74)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:573)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:432)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVI$sp(getkafkadata.scala:109)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:78)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(getkafkadata.scala:76)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:76)
at com.test.spark.mapping.getkafkadata$$anonfun$1$$anonfun$apply$2.apply(getkafkadata.scala:75)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
... 3 more
16/12/12 09:28:35 INFO DAGScheduler: Job 1 finished: foreachPartition at getkafkadata.scala:75, took 0.098511 s
ありがとうございました!あなたの例は本当に役に立ちます。 – gogocatmario