2016-10-23 23 views
0

Spark(Scala)を使用して、共有されているユーザーとページの一覧を含むファイルを読み込んでいて、特定のユーザー彼らが共有していたページによって。大きな入力を処理するときのパフォーマンスが非常に遅い

プログラムの実行が非常に悪く、多くの場合、GC overhead limit exceededエラーが発生します。

私はMac OSX上で8GBのRAMを搭載したSparkをローカルで実行しています。プログラムは、--driver-memory 5gの引数を持つ​​と、spark.cores.maxで割り当てられた8つのコアを使用して送信されます。入力セットは1.15GBのファイルです。

操作が非常に非効率的であることを示す人がいますか?

ありがとうございます。

ここで、コードを簡単に説明します。

各ユーザーエントリには、彼/彼女は、タブの後に共有ページが含まれており、各エントリはそうのような2つの改行で区切られます。

John Doe <tab> Page 1 
      <tab> Page 2 
      <tab> Page 3 

User 2  <tab> ... 

まず私はnewAPIHadoopFileを使用して入力ファイルを読み込みます。

val hdpConf = new Configuration(sc.hadoopConfiguration) 
hdpConf.set("textinputformat.record.delimiter", "\n\n") 
val hadoopFile = sc.newAPIHadoopFile("user_pages.list", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hdpConf) 

は今、私はその後、私はすべてのユーザーとページの組み合わせ(page, user)のための単一の(k,v)ペアを含むRDDを作成するので、

val pagesPerUser = hadoopFile.map { 
    line => 
     val line_splitted = line._2.toString.split("\t"); 
     (line_splitted(0), line_splitted.drop(1).mkString.split("\n")) 
} 

のようにペア(user, Array(pagesShared))にこれを回します。 map

val pageAndUser = pagesPerUser.flatMap(line => line._2.map(page => (line._1, page))) 
    .map(...) 
    .filter(...) 

は、ページタイトルをフィルタリングするreplaceAllを使用し、filterは引用符とタイトルがいくつかのより多くの基準を満たすかどうかを確認するためにmatches()を使って正規表現が含まれている特定のタイトルを含むすべてのエントリを削除します。

次に、別のユーザー(user, user)に直接リンクされたすべてのユーザーのペアを作成し、(user, Array(user))という形式のRDDに変換します(同じページを共有しているすべての直接接続されたユーザーを含む)。

val pageAndUsers = pageAndUser.groupByKey.mapValues(_.toArray) 
    .map(line => line._2) 
val commonUsers = pageAndUsers.flatMap(users => users.map(user => (user, users))) 
    .reduceByKey(_ ++ _).cache() 
    .map(users => (users._1, users._2.distinct)) 

このRDDはその後、より一層ユーザーの間の距離を決定するために使用することができますが、私はパフォーマンスの低下は、主にこれらの部品の一つであると思います。

commonUsersと判断すると、プログラムの実行速度が、reduceByKeymapのステップで遅いことがわかります。私がそれがゆっくりと実行されていると判断する方法は、仲間のプログラマーの解決策と比較することです。さらに、私はしばしばGC overflow/Heap space exceededエラーを取得します。これは私のコードで何らかのメモリリークが発生していることを示します。

EDIT: いくつかのより多くの調査の後、私はこの問題は、reduceByKey(_++_)ステップであるかなり確信しています。私は代わりにgroupByKeyを使ってみましたが、プログラムは私のところで失敗し、その特定のポイントで毎回クラッシュするようです。

+0

まず第一に、あなたの設定を伝え、どのようにそれを提供していて、クラスタモードまたはクライアントモードでそれを使用していてください! –

+0

私は8GBのRAMを搭載したMac OSX上でSparkをローカルで実行しています。プログラムは '--driver-memory 5g'という引数と' spark.cores.max'を設定して割り当てられた8個のコアを持つ 'spark-submit'を使って送信されます。 – Laurens

+3

何が遅いですか?何に比べて?どのステージがパフォーマンスが悪いですか? (あなたはSparkのUIページでそれを見ることができます) – maasg

答えて

1

reduceByKeyを実行し、それを使用して不定サイズに拡大する可能性のあるデータを結合することは危険です。たとえば、ある意味でページを共有するユーザーをリンクしているかのように見えます。しかし、あなたのユーザーの1人が他のすべてのユーザーにリンクしている場合はどうなりますか? reduceByKeyで構築しようとしている配列は非常に大きくなります。これがメモリとGCの問題の原因です。

このステージが実行されているときにSpark UIを見ると、いくつかのタスクがハングアップすることが予想されます。これらは、単一のユーザーが多くのユーザーにリンクしている場所です。 (すべてのユーザーがすべてのユーザーにリンクしている場合は、すべてがハングしている可能性があります)。

あなたのデータをreduceByKey(「pageAndUsers」RDD)の前に保存し、そのデータをクエリして何が起こっているのかを確認します。

おそらく、合計で「少量」のユーザーがいる場合は、配列ではなくセットを使用できる可能性があります。これは、ユーザーがペアになるにつれて自動的に「区別」されるため、大きすぎます(データにもよりますが)。

ただし、問題を理解するためにはデータを参照する必要があります。私はちょうどここに述べた一連のロジックを使用するには、いくつかの例では、(正確に速くない)コードです:

val pageAndUsers = pageAndUser.groupByKey.mapValues(_.toSet) 
    .map(line => line._2) 
val commonUsers = pageAndUsers.flatMap(users => users.map(user => (user, users))) 
    .reduceByKey(_ ++ _).cache() 
+0

提案したように、私は 'pageAndUsers' RDDを保存して、' count'を実行しようとしました。 Spark GUIには、そのサイズが約1GBであることがわかります。 'reduceByKey'を含むステップで同じことをすると、データサイズが大量に増加し、データがディスクに流出することがわかります。したがって、 'reduceByKey'はあなたが言及したように実際にたくさんのデータを生成します。パフォーマンスを改善するにはどうすればよいでしょうか? – Laurens

+0

パフォーマンス上の問題ではありません。数日間放置すると、GCにすべての時間を費やすことになり、最終的には失敗します。非常に大きなパーティションを扱うことは非常に難しいです。ちょうどあなたが欲しいものとあなたのデータがどのように見えるかによって異なります。今のところ、私はあなたのデータを照会して、「悪い」ページを見つけてフィルタでフィルタリングすることができます。おそらく、すべてのユーザーが何人かのページに結びついているように、すべてのユーザーにリンクさせたくないのかもしれません。 –

関連する問題