2016-03-24 23 views
0

自分のフィーチャ生成プロジェクトのツールとしてSparkを使用しています。アソシエートされていないフィーチャを生成するときにSparkのパフォーマンスが低下する

  • Datasource1を:次のように、この特定のプロジェクトのために、私はRDDSにロードする2つのデータソースを有するRDD1 = [(キー、(時間、量、ユーザID、...)J] => ... =>このようなトランザクションIDなどの他の属性の束、等
  • Datasource2:RDD2 = [(キー、(T1、T2)J)] RDD1で

、時間は、時間を表しますRDD2で、各機能の許容時間間隔を示しています。機能キーは「キー」です。

  1. 連想機能:例:商品
  2. 非結合機能の数、各機能キーに対するユーザー

の固有番号は、I間隔(T1、T2)に該当するイベントを確認する必要がありそれらを集約することができます。だから、次のように、私は減らす操作に続いて参加さ:

`RDD1.join(RDD2).map((key,(v1,v2))=>(key,featureObj)).reduceByKey(...)` 

を私の機能の初期値はfeatureObj =(0、セット([]))になり、最初の引数は、アイテムの数を保存する場所と2番目にユニークなユーザーIDの数が格納されます。また、入力データを分割して、RDD1とRDD2が同じパーティショナを使用していることを確認します。

ここで、結合フィーチャを計算するためにジョブを実行すると、わずか3分で16 m2.xlargeのクラスタで非常に高速に実行されます。 2分目を追加すると、計算時間は5分になります。私は他のいくつかの非関連機能を追加しようとしましたが、そのたびに実行時間が短くなりました。今、私の仕事は15分15分で実行されます。そのうちの10分は非関連です。私もKyroSerializerを使い、シリアル化された形式でRDDを永続化しようとしましたが、何も特別なことはありませんでした。私はもっ​​と多くの機能を実装しようとしているので、この問題はボトルネックになるようです。

PS。私は単一の大きなホスト(128GBのRAMと16コア)で同じタスクを実行しようとしました。 145の機能で、10分で作業が完了しました。私はSparkボトルネックがJOINであるという印象を受けています。私は自分のRDDをチェックし、両方が同じ方法で同じパーティションに分割されていることに気づいた。 1つの仕事がこれらの2つのRDDを呼んでいるので、私はそれらが同じ場所にあると推測していますか?しかし、spark web-consoleには、「2.6GB」のシャッフル読み取りと「15.6GB」のシャッフル書き込みがまだ表示されています。

私がここで本当にクレイジーなことをしていると誰かに助言してもらえますか?間違ったアプリケーションにSparkを使用していますか?コメントをお寄せいただきありがとうございます。よろしく、

アリ

+1

基本的に 'groupByKey'のいくつかの亜種を追加していますが、パフォーマンスは悪化していますか?それはかなり期待される動作です。 – zero323

+0

これは間違いありません。しかし、私の質問は別のものです。私が上で議論したフィーチャ生成パイプライン全体を単一のホスト(128コアのメモリと16コア)に実装し、すべてのプロセス(すべての非関連機能を含む)を10分で実行しました。 Spark(メモリは61GB、コア数は8の16ノード×16ノード)のため、全機能の10%で15分しか使用できません。私は主要なボトルネックが非連想機能にあることを知っています。これはSparkがこのアプリケーションに適したツールではないという兆候ですか?私はシャッフルを避けるためにデータを分割しましたが、私はシャッフルしています。 – gorjida

+0

あなたのコードを1つ実行すると、シャッフルのコストが最小限に抑えられるため、すべてのマシンがすべてローカルに近づきます。さらに、パーティション分割はシャッフルを削除しません。シャッフルを明示的にするだけです。最終的には共同で分割されました!=共同設置されました(既にSOでカバーされています)。他にもおそらく他の問題があります。 – zero323

答えて

0

私も、シャッフル操作でパフォーマンスの低下に気づきました。同じエグゼキュータ内のあるコアから別のコアにデータがシャッフルされたとき(ローカルPROCESS_LOCAL)、シャッフルは非常に速く実行されていましたが、他のすべての状況では予想よりもはるかに遅く、NODE_LOCALでも非常に遅かったことが判明しました。これはSpark UIで見ることができます。

さらにCPUとガベージコレクションの監視を行ったところ、ある時点でガーベジコレクションによってクラスタ内のノードの1つが応答しなくなり、他のノードがこのノードとの間でデータをやりとりすることがブロックされていました。

ガベージコレクションのパフォーマンスを向上させるために調整できるオプションがたくさんあります。 1つの重要なことは、javaの8u45以上が必要なG1ガベージコレクタ用の膨大なオブジェクトの早期再利用を可能にすることです。

私の場合、最大の問題はネットのメモリ割り当てでした。 spark.shuffle.io.preferDirectBufs = falseを設定してダイレクトバッファメモリをオフにしたとき、私のジョブはずっと安定していました。

+0

洞察に感謝します。私の場合、GCはボトルネックには見えません。 Webコンソールをチェックすると、GC時間は総タスク時間のわずか10%にすぎません。 reduceByKeyの代わりに、mapPartitionWithIndexを使用するオプションがありました。私はマップ関数内で手動で集計を行いました。実行時間は2倍になりました。私が気づいたのは、新しいファミリのファミリを追加するたびにシャッフル書き込みが5GB増加し、これがパフォーマンスを犠牲にしたことでした。 mapPartitionWithIndexが良い選択肢であるかどうか分かりません。 – gorjida

関連する問題