2017-10-03 28 views
-2

私はSpark-shellを使用しています.Spark-shellを使用して感情分析を実行するには、Kafkaトピックのツイートを保存しました。Value tailは(String、String)のメンバーではありません

I追加した依存性: org.apache.spark:火花ストリーミングkafka_2.10:1.6.2 edu.stanford.nlp:スタンフォード-corenlp:3.5.1

これらは上のコードでありますこれは私が働いています:

import org.apache.spark._ 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.kafka._ 
val conf = new SparkConf().setMaster("local[4]").setAppName("KafkaReceiver") 
val ssc = new StreamingContext(conf, Seconds(5)) 
val kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com:2181","test-consumer-group", Map("test12" -> 5)) 
val topCounts60 = kafkaStream.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map { case (topic, count) => (count, topic) }.transform(_.sortByKey(false)) 
    topCounts60.foreachRDD(rdd => { 
     val topList = rdd.take(10) 
     println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) 
     topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) } 
    }) 
kafkaStream.count().map(cnt => "Received " + cnt + " kafka messages.").print() 
val wordSentimentFilePath = "hdfs://sandbox.hortonworks.com:8020/TwitterData/AFINN.txt" 
    val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line => 
    val Array(word, happiness) = line.split("\t") 
    (word, happiness) 
    } cache() 
val happiest60 = kafkaStream.map(hashTag => (hashTag.tail, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)). transform{topicCount => wordSentiments.join(topicCount)} 
       .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false)) 
ssc.start() 
ssc.stop() 

をしかし、これらの行を実行しながら、

val happiest60 = kafkaStream.map(hashTag => (hashTag.tail,1)).reduceByKeyAndWindow(_ + _, Seconds(60)). transform{topicCount => wordSentiments.join(topicCount)}.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false)) 

それはエラーがスローされます。

+0

すべての変数の型を宣言してください。それはあなたが間違っている場所を把握するのに役立ちます。 – Dima

答えて

0

おそらくhashTagのタイプは(String, String)なので、tail操作は定義されていません。 tailは、コレクションで定義された関数であり、タプルではありません。

map操作は、ストリームから受け取った単一の項目に対して実行されます。カフカストリームにタイプが(String, String)のアイテムが含まれている場合、それは正常です。

+0

あなたのお返事ありがとうございます.... – Priyal

関連する問題