2016-10-09 4 views
0

私はスパークストリーミングでTwitterの統合を学んでいます。スパークからのTwitterデータ

import org.apache.spark.streaming.{Seconds, StreamingContext} 
    import org.apache.spark.SparkContext._ 
    import org.apache.spark.streaming.twitter._ 
    import org.apache.spark.SparkConf 

    /** 
    * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter 
    * stream. The stream is instantiated with credentials and optionally filters supplied by the 
    * command line arguments. 
    * 
    * Run this on your local machine as 
    * 
    */ 
    object TwitterPopularTags { 
     def main(args: Array[String]) { 


     if (args.length < 4) { 
      System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " + 
      "<access token> <access token secret> [<filters>]") 
      System.exit(1) 
     } 

     StreamingExamples.setStreamingLogLevels() 

     val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) 
     val filters = args.takeRight(args.length - 4) 

     // Set the system properties so that Twitter4j library used by twitter stream 
     // can use them to generat OAuth credentials 
     System.setProperty("twitter4j.oauth.consumerKey", consumerKey) 
     System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) 
     System.setProperty("twitter4j.oauth.accessToken", accessToken) 
     System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) 

     val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[2]") 
     val ssc = new StreamingContext(sparkConf, Seconds(2)) 
     val stream = TwitterUtils.createStream(ssc, None, filters)//Dstream 

     val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) 

     val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) 
         .map{case (topic, count) => (count, topic)} 
         .transform(_.sortByKey(false)) 

     val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) 
         .map{case (topic, count) => (count, topic)} 
         .transform(_.sortByKey(false)) 


     // Print popular hashtags 
     topCounts60.foreachRDD(rdd => { 
      val topList = rdd.take(10) 
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) 
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} 
     }) 

     topCounts10.foreachRDD(rdd => { 
      val topList = rdd.take(10) 
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) 
      topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} 
     }) 

     ssc.start() 
     ssc.awaitTermination() 
     } 
    } 

私は完全には、以下の2つのコード行理解することはできませんよ。

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) 
val filters = args.takeRight(args.length - 4) 

を誰かが私にこれらの2行を説明していただけますか?

おかげで、よろしく、

答えて

2
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) 

argsは配列です。 take(4)は、最初の(一番左の)4つの要素を持つサブ配列を返します。これら4つの要素をArray(consumerKey, consumerSecret, accessToken, accessTokenSecret)に代入すると、値consumerKeyに最初の要素の値が保持されます。 consumerSecretは2番目の値を保持します。以下同様です。これは、名前付きの値に配列(他のコレクションでもできる)を「アンパックする」素朴なScalaのトリックです。

val filters = args.takeRight(args.length - 4) 

takeRight(n)配列の最後n要素を意味し、右からサブアレイを返します。ここでは、最初の4つの要素を除くすべてを持つ配列が、filtersという名前の新しい値に割り当てられます。

+0

説明のためにTzachさんに感謝します。それは多くの助けになる – subho

関連する問題