2017-09-04 4 views
0

のための変数の内部にアクセスすることはできません。所望の出力は、イベントのグループ当たりの発生数を得ることである。この最初の例では、出力は次のようになります。Scalaは、私は、次のRDDを持つループ

{ "A" -> 6 , "B" -> 6 } 

私のコードでは、私は、所望の出力を得る:

val rdd = sqlContext.sparkContext.makeRDD(Seq(
"1:AAAAABAAAAABAAAAABAAAAAB","2:BBAAAAAAAAAABBAAAAAAAAAA")) 
val rddSplited = rdd.map(_.split(":")(1).toList) 
    val values = scala.collection.mutable.Map[String, Long]() 
    var iteracion = 0 
    for (ocurrences <- rddSplited) { 
     var previousVal = "0" 
     for (listValues <- ocurrences) { 
     if (listValues.toString != previousVal) { 
      values.get(listValues.toString) match { 
      case Some(e) => values.update(listValues.toString, e + 1) 
      case None => values.put(listValues.toString, 1) 
      } 
      previousVal = listValues.toString() 
     } 
     } 
     //println(values) //return the values 

    } 
     println(values) //returns an empty Map 

    } 

問題は、それが

のprintln(値)

です

はデータを返しませんが、コメント付きのprintlnが配置されたときに変更すると、マップ値は値を返します。

メインループの後にマップの最終値を返すにはどうすればよいですか?

私の実装が最良でない場合は、このScala/Sparkの新機能です。

ありがとうございます。

私が達成しようとしていることをよりよく説明するための質問を編集しています。 コードは答えに(あなたの助けを借りてありがとう)返します。私はイベントの数をカウントするつもりはないよ、私は必要なものイベントが別のものに変化したとき、すなわち出現の数を数えることである。

AAAAABAAAAABAAAAABAAAAAB => A-> 4 , B-> 4 
    BBAAAAAAAAAABBAAAAAAAAAA => A-> 2 , B-> 2 

So the final output should be A-> 6 , B-> 6 

私は誤解のために、本当にごめんなさい。

+1

Sparkでは、推奨されない単一のJVM Scalaコード(そのような突然変異/副作用)でも... – cchantep

答えて

2

あなたは非常にJavaのような方法で結果を達成しようとしているようです。私は次のように正確に何をしたいんScalaの機能的なスタイルのプログラムを書いている:

val rdd = sqlContext.sparkContext.makeRDD(Seq("1:AAAAABAAAAABAAAAABAAAAAB","2:BBAAAAAAAAAABBAAAAAAAAAA")) 

rdd.foreach{elem => 
    val splitted = elem.split(":") 
    val out: Seq[Map[Char, Int]] = splitted 
     .tail 
     .toSeq 
     .map(_.groupBy(c => c).map{case (key, values) => key -> values.length}) 
    println(out) 
    } 
+0

自分自身をよりよく説明しようとすると、新しい詳細が追加されました。 – AJDF

0

あなたのコード(相互状態、怠惰な変換)を持つ複数の問題があり、これを試してみてください。

val rdd = ss.sparkContext.makeRDD(Seq("1:AAAAABAAAAABAAAAABAAAAAB","2:BBAAAAAAAAAABBAAAAAAAAAA")) 

rdd.foreach{record => 
    val Array(_,events) = record.split(":") 
    val eventCount = events.groupBy(identity).mapValues(_.size) 
    println(eventCount) 
    } 

foreachmapが遅延している)の代わりにmapを使用すると、printlnが表示されないことに注意してください。また、printlnは、クラスタのワーカーノードの標準出力に移動します。これは、sparkでlocalモードを使用した場合にのみ表示されます。

+0

私は自分自身をよりよく説明しようと、新しい詳細を追加しました。 – AJDF

関連する問題