私はspark sqlを使用して、リアルタイム傾向分析のためにzeppelinを使用してカフカから来るデータをクエリしようとしていますが、成功しません。ここzeppelinでspark sqlを使用してSpark StreamingContextをクエリする方法はありますか?
は、私がツェッペリンに
//Load Dependency
%dep
z.reset()
z.addRepo("Spark Packages Repo").url("http://repo1.maven.org/maven2/")
z.load("org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1")
z.load("org.apache.spark:spark-core_2.11:2.0.1")
z.load("org.apache.spark:spark-sql_2.11:2.0.1")
z.load("org.apache.spark:spark-streaming_2.11:2.0.1"
//simple streaming
%spark
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.sql.SparkSession
val conf = new SparkConf()
.setAppName("clickstream")
.setMaster("local[*]")
.set("spark.streaming.stopGracefullyOnShutdown", "true")
.set("spark.driver.allowMultipleContexts","true")
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config(conf)
.getOrCreate()
val ssc = new StreamingContext(conf, Seconds(1))
val topicsSet = Set("timer")
val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.25.1:9091,192.168.25.1:9092,192.168.25.1:9093")
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet).map(_._2)
lines.window(Seconds(60)).foreachRDD{ rdd =>
val clickDF = spark.read.json(rdd) //doesn't have to be json
clickDF.createOrReplaceTempView("testjson1")
//olderway
//clickDF.registerTempTable("testjson2")
clickDF.show
}
lines.print()
ssc.start()
ssc.awaitTermination()
を実行しています簡単なコードスニペットは、私は、各カフカのメッセージを印刷することができていますが、私は単純なSQL %sql select * from testjson1 // or testjson2
を実行したときに、私は次のエラー
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:646)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
を取得しています
this postストリーミングデータが照会されています(ツイッターの例)。だから私はカフカストリーミングで可能であるべきだと思っています。だから私は、多分、私は間違っている何かやっているか、いくつかのポイントを逃していると思います
任意のアイデア、提案、勧告は、エラーメッセージが一時ビューが欠落していることを教えてくれない
オリジナルでは、createOrReplaceTempView(...)の代わりにrdd.toDf()。registerTempTable(...)を使用しています。あなたは "古い方法"を試しましたか?また、処理するRDDがいくつかある場合は、各rdd処理結果が前のものを上書きするため、最後のものを選択できるようになります - そうですか? –
私は両方の方法を試しました...しかし、誰も質問するための一時的なビューを作成していません。 – sagarthapa