私はZeppelinでTwitterストリーミングの例を実行しようとしています。私が検索した後、Sparkインタプリタに「org.apache.bahir:spark-streaming-twitter_2.11:2.0.0」を追加しました。だから、のように、私は、最初の部分の作業を行うことができます。今、私のように後半を追加しようとしていますZeppelin Twitterストリーミングの例が動作しない
Apache Zeppelin 0.6.1: Run Spark 2.0 Twitter Stream App
:
case class Tweet(createdAt:Long, text:String, screenName:String)
twt.map(status=>
Tweet(status.getCreatedAt().getTime()/1000, status.getText(), status.getUser().getScreenName())
).foreachRDD(rdd=>
rdd.toDF().registerTempTable("tweets")
)
今、私はエラーを得た:
<console>:56: error: not found: type StreamingContext
val ssc = new StreamingContext(sc, Seconds(2))
^
<console>:56: error: not found: value Seconds
val ssc = new StreamingContext(sc, Seconds(2))
^
<console>:61: error: not found: value Seconds
val twt = tweets.window(Seconds(60))
実際にケースラインを追加しましたが、上記のエラーが発生しました。私は本当にここで何が起こったのか分かりませんでした。
いずれも、ここに任意の手掛かりを持っていますか? 2.0.0 ツェッペリン:0.6.2
どうもありがとうここ
は詳細 スパークです。
============================================== =======================
// All codes for your reference:
import org.apache.spark.streaming.twitter
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
import org.apache.spark.SparkConf
// ********************************* Configures the Oauth Credentials for accessing Twitter ****************************
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {...}
// ***************************************** Configure Twitter credentials ********************************************
val apiKey = ...
val apiSecret = ...
val accessToken = ...
val accessTokenSecret = ...
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
// ************************************************* The logic itself *************************************************
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))
twt.print
// above codes work correctly
// If added the following line, it failed with the above error
case class Tweet(createdAt:Long, text:String, screenName:String)