2017-06-15 20 views
0

私はスパークストリーミングを使用してTwitterからデータをストリーミングしようとしていました。しかし、
以下の問題があります。スパークストリーミングTwitter createStream問題

import org.apache.spark.streaming.twitter._ 
import twitter4j.auth._ 
import twitter4j.conf._ 
import org.apache.spark.streaming.{Seconds,StreamingContext} 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
val ssc = new StreamingContext(sc, Seconds(10)) 
val cb = new ConfigurationBuildercb.setDebugEnabled(true).setOAuthConsumerKey("").setOAuthConsumerSecret("").setOAuthAccessToken ("").setOAuthAccessTokenSecret("") 
val auth = new OAuthAuthorization(cb.build) 
val tweets = TwitterUtils.createStream(ssc,auth) 

エラー画面:

val tweets = TwitterUtils.createStream(ssc,auth) 
<console>:49: error: overloaded method value createStream with alternatives: 
    (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,twitterAuth: twitter4j.auth.Authorization)org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and> 
    (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,filters: Array[String])org.apache.spark.streaming.api.java.JavaReceiverInputDStream[twitter4j.Status] <and> 
    (ssc: org.apache.spark.streaming.StreamingContext,twitterAuth: Option[twitter4j.auth.Authorization],filters: Seq[String],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[twitter4j.Status] 
cannot be applied to (org.apache.spark.streaming.StreamingContext, twitter4j.auth.OAuthAuthorization) 
     val tweets = TwitterUtils.createStream(ssc,auth) 
+0

はここからいくつかのインスピレーションを描くお気軽に:https://github.com/stefanobaghino/spark-twitter-stream-example – stefanobaghino

+0

おかげで@ステファノバギノ –

答えて

1

問題のメソッドこのシグネチャがあります。

def createStream(
    ssc: StreamingContext, 
    twitterAuth: Option[Authorization], 
    filters: Seq[String] = Nil, 
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 
) 

我々はssc: StreamingContexttwitterAuth: Option[Authorization]が必須であることを確認することができます。他の2つはオプションです。

あなたのケースでは、twitterAuthタイプが正しくありません。それはOption[Authorization]です。呼び出しは、このケースでは、次のようになります。

val tweets = TwitterUtils.createStream(ssc, Some(auth)) 
0
import org.apache.spark._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.twitter._ 
import org.apache.spark.streaming.StreamingContext._ 


object TwitterStream { 

def setupLogging() = { 
import org.apache.log4j.{Level, Logger} 
val rootLogger = Logger.getRootLogger() 
rootLogger.setLevel(Level.ERROR) 
} 

/** Configures Twitter service credentials using twiter.txt in the main 
workspace directory */ 
def setupTwitter() = { 
import scala.io.Source 

for (line <- Source.fromFile("/Users/sampy/twitter.txt").getLines) { 
    val fields = line.split(" ") 
    if (fields.length == 2) { 
    System.setProperty("twitter4j.oauth." + fields(0), fields(1)) 
    } 
} 
} 

/** Our main function where the action happens */ 
def main(args: Array[String]) { 

setupTwitter() 


val ssc = new StreamingContext("local[*]", 
"PopularHashtags",Seconds(5)) 

setupLogging() 

val tweets = TwitterUtils.createStream(ssc, None) 
val engTweets = tweets.filter(x => x.getLang() == "en") 

val statuses = engTweets.map(status => status.getText) 

val tweetwords = statuses.flatMap(tweetText => tweetText.split(" ")) 

val hashtags = tweetwords.filter(word => word.startsWith("#")) 

val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1)) // 


val hashtagCounts = 
hashtagKeyValues.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(5), 
Seconds(20)) 
val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => 
x._2, false)) 
sortedResults.saveAsTextFiles("/Users/sampy/tweetsTwitter","txt") 

sortedResults.print 



ssc.checkpoint("/Users/sampy/checkpointTwitter") 
ssc.start() 
ssc.awaitTermination() 
} 
} 
関連する問題