1

私はspark sqlを使用して、リアルタイム傾向分析のためにzeppelinを使用してカフカから来るデータをクエリしようとしていますが、成功しません。ここzeppelinでspark sqlを使用してSpark StreamingContextをクエリする方法はありますか?

は、私がツェッペリンに

//Load Dependency 
%dep 
    z.reset() 
    z.addRepo("Spark Packages Repo").url("http://repo1.maven.org/maven2/") 
    z.load("org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1") 
    z.load("org.apache.spark:spark-core_2.11:2.0.1") 
    z.load("org.apache.spark:spark-sql_2.11:2.0.1") 
    z.load("org.apache.spark:spark-streaming_2.11:2.0.1" 

//simple streaming 
%spark 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.sql.{Row, SQLContext} 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.kafka.KafkaUtils 
import _root_.kafka.serializer.StringDecoder 
import org.apache.spark.sql.SparkSession 

val conf = new SparkConf() 
    .setAppName("clickstream") 
    .setMaster("local[*]") 
    .set("spark.streaming.stopGracefullyOnShutdown", "true") 
    .set("spark.driver.allowMultipleContexts","true") 


val spark = SparkSession 
    .builder() 
    .appName("Spark SQL basic example") 
    .config(conf) 
    .getOrCreate() 

val ssc = new StreamingContext(conf, Seconds(1)) 

val topicsSet = Set("timer") 
val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.25.1:9091,192.168.25.1:9092,192.168.25.1:9093") 

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, topicsSet).map(_._2) 

lines.window(Seconds(60)).foreachRDD{ rdd => 
    val clickDF = spark.read.json(rdd) //doesn't have to be json 
    clickDF.createOrReplaceTempView("testjson1") 
    //olderway 
    //clickDF.registerTempTable("testjson2") 
    clickDF.show 

} 

lines.print() 
ssc.start() 
ssc.awaitTermination() 

を実行しています簡単なコードスニペットは、私は、各カフカのメッセージを印刷することができていますが、私は単純なSQL %sql select * from testjson1 // or testjson2を実行したときに、私は次のエラー

java.util.NoSuchElementException: None.get 
at scala.None$.get(Option.scala:347) 
at scala.None$.get(Option.scala:345) 
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343) 
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:646) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
を取得しています

this postストリーミングデータが照会されています(ツイッターの例)。だから私はカフカストリーミングで可能であるべきだと思っています。だから私は、多分、私は間違っている何かやっているか、いくつかのポイントを逃していると思います

任意のアイデア、提案、勧告は、エラーメッセージが一時ビューが欠落していることを教えてくれない

+0

オリジナルでは、createOrReplaceTempView(...)の代わりにrdd.toDf()。registerTempTable(...)を使用しています。あなたは "古い方法"を試しましたか?また、処理するRDDがいくつかある場合は、各rdd処理結果が前のものを上書きするため、最後のものを選択できるようになります - そうですか? –

+0

私は両方の方法を試しました...しかし、誰も質問するための一時的なビューを作成していません。 – sagarthapa

答えて

1

を歓迎しています。エラーメッセージは、タイプNoneが名前 'get'を持つ要素を提供しないことを示しています。

スパークの場合、RDDに基づく計算は、アクションが呼び出されたときに実行されます。したがって、一時テーブルを作成する時点まで計算は実行されません。すべての計算は、テーブルに対してクエリを実行するときに実行されます。テーブルが存在しない場合、別のエラーメッセージが表示されます。

おそらく、カフカのメッセージが表示される可能性がありますが、例外は「なし」インスタンスが「取得」を知らないことを示しています。だからあなたのソースJSONデータにはデータのないアイテムが含まれていて、これらのアイテムはNoneで表され、したがってsparkが計算を実行している間に実行を引き起こすと思います。

ソリューションが一般的に動作するかどうかを確認するには、空のJSON要素が含まれていないサンプルデータで動作するかどうかをテストすることをお勧めします。

関連する問題