2016-04-26 4 views
3

こんにちはメンバーではありません。また、出力を生成する前にマップを使用してデータ値をルックアップする必要があります。ここにコードがあります。reduceByKeyは、私は単にドキュメントからワードカウントを取得するコードがある

requests 
    .filter(_.description.exists(_.length > 0)) 
    .flatMap { case request => 
     broadcastDataMap.value.get(request.requestId).map { 
     data => 
      val text = Seq(
      data.name, 
      data.taxonym, 
      data.pluralTaxonym, 
      request.description.get 
     ).mkString(" ") 
      getWordCountsInDocument(text).map { case (word, count) => 
      (word, Map(request.requestId -> count)) 
      } 
     } 
    } 
    .reduceByKey(mergeMap) 

エラーメッセージが

reduceByKey is not a member of org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,scala.collection.immutable.Map[Int,Int]]] 

どのように私はこの問題を解決することができますか?私はgetWordCountsInDocumentを呼び出す必要があります。ありがとう!

+0

あなたはPairRDDを得る必要があります。 reduceByKeyの前に.map()を使用してみてください – Natalia

答えて

3

reduceByKeyは、基本的にはフォームRDD[(K, V)]でRDDSに暗黙的に追加される、PairRDDFunctionsのメンバーです。おそらく構造体を平らにしてRDD[String, Map[Int,Int]]にする必要があります。

入力の種類(requestsbroadcastDataMapmergeMap)を入力できる場合は、その変換に関するヘルプを提供することができます。提供タイプ、およびgetWordCountsInDocumentの戻り値の型は、いくつかのコレクションであるという仮定から

[(ワード、カウント:int)を]

を変更する:

broadcastDataMap.value.get(request.requestId).map { 

broadcastDataMap.value.get(request.requestId).flatMap { 

で問題を修正する必要があります。

+0

ありがとうございました。 #1:mergeMapは、2つのマップを取る関数であるの[int:broadcastDataMapはマップ[requestID、データ(名前、taxonym、pluralTaxonym)]#3で放送される:要求が(requestId、説明)#2を有する要求のRDD、あります、INT]一の地図を返す[INT、INT]:プライベートデフmergeMap(MAP1:地図[INT、INT]、MAP2:地図[INT、INT]):地図[INT、INT] = { (MAP1 ++ MAP2 ).MAP {場合(キー、_)=> (キー、map1.getOrElse(鍵、0)+ map2.getOrElse(鍵、0))} } – RandomBookmark

関連する問題