2016-06-29 11 views
0

大RDDSを扱うことができないコードされていますGROUPBYはここ

val words = sc.textFile("/Users/kaiyin/IdeaProjects/learnSpark/src/main/resources/eng_words.txt") 
words.take(1000000).foreach(println _) 
words.take(150000).groupBy((x: String) => x.head).map { 
    case (c, iter) => (c, iter.toList.size) 
}.foreach { 
    println _ 
} 

eng_words.txtは約100万英単語、1行につき1つを含むテキストファイルです。 RDDが150000以上になるならば、groupByは、このエラーでクラッシュします:

java.util.NoSuchElementException: next on empty iterator 
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) 
    at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) 
    at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63) 
    at scala.collection.IterableLike$class.head(IterableLike.scala:107) 
    at scala.collection.immutable.StringOps.scala$collection$IndexedSeqOptimized$$super$head(StringOps.scala:30) 
    at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126) 
    at scala.collection.immutable.StringOps.head(StringOps.scala:30) 
    at $anon$1$$anonfun$run$1.apply(<console>:23) 
    at $anon$1$$anonfun$run$1.apply(<console>:23) 
    at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:332) 
    at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:331) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:331) 
    at scala.collection.mutable.ArrayOps$ofRef.groupBy(ArrayOps.scala:186) 
    at $anon$1.run(<console>:23) 
    at Helper.HasRun$class.newRun(HasRun.scala:21) 
    at $anon$1.newRun(<console>:19) 
    ... 55 elided 

何が悪かったのか?

答えて

3

この特定のケースでは、おそらく空の文字列を処理できません。それにもかかわらず、groupByを入力しないでください。toListに電話をしてはいけません。

  • headは、各キーはエグゼキュータのメモリに収まるようにするためにすべてのレコードを必要と

  • groupBy同じgroupByKeyとして表示されるエラーと空の行に失敗します。

さらに別の単語を数える何がここに持っている:、

words 
    // Make sure that it won't fail on empty string with 
    // java.util.NoSuchElementException: next on empty iterator 
    .flatMap(_.headOption) 
    // Map to pairs and reduce to avoid excessive shuffling and limit memory usage 
    .map((_, 1)) 
    .reduceByKey(_ + _) 
+1

待ちということは正しいでしょうか?私は、エグゼキュータが十分なメモリにアクセスできない場合、ディスクに必要なものをシームレスに流出させることができたと思いました。それには大きな減速がありますが、それでも機能するはずです。それにもかかわらず、 'reduceByKey'は、あなたが提案したように、実行者が残りを実行する前に集計を実行できるので、より良いものになります。 – Jeff

+1

@JeffL。あなたはここで部分的に正しいです。唯一のキーではなく、必要に応じてデータが流出することがあります。値はArrayBufferのバリエーションに過ぎず、メモリに収まる必要があります。 – zero323

+1

これは面白いです、ありがとう。あなたがこのような詳細に遭遇したときに、スパークが時にはボンネットの下で行うことについての良い情報を見つけるのは難しいです。また、このうちのいくつかは、あなたがエグゼキュータにクラスタ用に割り当てたリソースによって軽減される可能性があります。これは、私が間違っていなければ、 'reduceByKey'には相当するデータフレームがないので、主に重要と思われます。 @ JeffL。 – Jeff