2017-04-19 2 views
0

私はテキストファイルのRDDをループし、ファイル内の各ユニークワードの集計を行い、それぞれのユニークワードに続くすべてのワードとそのカウントを累積しようとしています。これまでのところ、これは私が持っているものです。(String、(String、Int))として設定されたタプルの値のキーにreduceByKeyを使用する方法は?

// connecting to spark driver 
val conf = new SparkConf().setAppName("WordStats").setMaster("local") 
val spark = new SparkContext(conf) //Creates a new SparkContext object 

//Loads the specified file into an RDD 
val lines = sparkContext.textFile(System.getProperty("user.dir") + "/" + "basketball_words_only.txt") 

//Splits the file into individual words 
val words = lines.flatMap(line => { 

    val wordList = line.split(" ") 

    for {i <- 0 until wordList.length - 1} 

    yield (wordList(i), wordList(i + 1), 1) 

}) 

Output Generated By My Current MapReduce Program

私はこれまで明らかにされていない場合、私は何をしようとしていることで、各単語に続く単語の集合を蓄積することですその単語が続く回数と一緒に、

答えて

0

私が正しくあなたを理解していれば、私たちはこのようなものがあります:

val lines: Seq[String] = ... 
val words: Seq[(String, String, Int)] = ... 

をそして、我々はこのような何かしたい:

val frequencies: Map[String, Seq[(String, Int)]] = { 
    words 
    .groupBy(_._1)      // word -> [(w, next, cc), ...] 
    .mapValues { values => 
     values 
     .map { case (w, n, cc) => (n, cc) } 
     .groupBy(_._1)     // next -> [(next, cc), ...] 
     .mapValues(_.reduce(_._2 + _._2)) // next -> sum 
     .toSeq 
    } 
} 
+0

を@adu、ありがとうございます!あなたは私が探しているものを正確に理解しているようですが、自分自身でこのコードを追加しようとすると、次のようなコンパイル時に2つのエラーが発生します。エラー:(92,36)タイプの不一致。 が見つかりました: 必須:(String、Int) .mapValues(_。reduce(_._ 2 + _._ 2))// next - > sum
エラー:(93,12)タイプの不一致。 が見つかりました:Seq [(String、(String、Int))] 必須:Seq [(String、Int)] コードが85〜95行目に配置された.toSeq – JGT

関連する問題