2017-02-06 10 views
-1

ツイートをストリーミングしてSparkSQLを使用してクエリを行うためのzeppelinチュートリアルに従うときは、 'tweets' tempテーブルが見つからない場合はエラーになります。正確なコードが使用されているとのリンクは次のように称するZeppling twitterストリーミングの例、

参考:次の段落でhttps://zeppelin.apache.org/docs/0.6.2/quickstart/tutorial.html

import scala.collection.mutable.HashMap 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.twitter._ 
import org.apache.spark.storage.StorageLevel 
import scala.io.Source 
import scala.collection.mutable.HashMap 
import java.io.File 
import org.apache.log4j.Logger 
import org.apache.log4j.Level 
import sys.process.stringSeqToProcess 

/** Configures the Oauth Credentials for accessing Twitter */ 
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) { 
    val configs = new HashMap[String, String] ++= Seq(
    "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret) 
    println("Configuring Twitter OAuth") 
    configs.foreach{ case(key, value) => 
    if (value.trim.isEmpty) { 
     throw new Exception("Error setting authentication - value for " + key + " not set") 
    } 
    val fullKey = "twitter4j.oauth." + key.replace("api", "consumer") 
    System.setProperty(fullKey, value.trim) 
    println("\tProperty " + fullKey + " set as [" + value.trim + "]") 
    } 
    println() 
} 

// Configure Twitter credentials 
val apiKey = "xxx" 
val apiSecret = "xxx" 
val accessToken = "xx-xxx" 
val accessTokenSecret = "xxx" 

configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) 

import org.apache.spark.streaming._ 
import org.apache.spark.streaming.twitter._ 


@transient val ssc = new StreamingContext(sc, Seconds(2)) 
@transient val tweets = TwitterUtils.createStream(ssc, None) 
@transient val twt = tweets.window(Seconds(60), Seconds(2)) 

val sqlContext= new org.apache.spark.sql.SQLContext(sc) 
import sqlContext.implicits._ 
case class Tweet(createdAt:Long, text:String) 
twt.map(status=> 
     Tweet(status.getCreatedAt().getTime()/1000, status.getText())).foreachRDD(rdd=> 
     // Below line works only in spark 1.3.0. 
     // For spark 1.1.x and spark 1.2.x, 
     // use rdd.registerTempTable("tweets") instead. 
     rdd.toDF().registerTempTable("tweets") 
) 

ssc.start() 

、私が持っている、次の例外をスローするSQLのSELECT文

%sql select createdAt, count(1) from tweets group by createdAt order by createdAt 

org.apache.spark.sql.AnalysisException: Table not found: tweets; 
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:305) 

答えて

0

makiが実行している上記の例を得ることができました次の編集を行います。代わりに、「第二パラでSparkSQL error Table Not Found

REFをこの変更が原因スパーク(V1.6.3)のバージョンアップや、私が欠落している可能性があり、いくつかの他の基盤となるアーキテクチャのニュアンスに必要となった場合には、わからないけど、eitherways SQL構文として直接呼び出す場合は、次のようにsqlContextを使用してみてください。

val my_df = sqlContext.sql("SELECT * from sweets LIMIT 5") 
my_df.collect().foreach(println) 
関連する問題