2017-02-02 10 views
2

私はスパークforeachpartition接続の改善は

  1. は、HDFSのテキストファイルからデータを読み込み、以下の操作を行い火花ジョブを書かれています。
  2. 重複をフィルタリングするためにdistinct()を呼び出します。
  3. はmapToPairフェーズを行うとreducebykeyコール
  4. 行うグループ化されたタプルの集約ロジックを実行してください
  5. pairRDD
  6. を生成します。
  7. 今#5

    上のforeachを呼び出すここでは

    1. を行いカサンドラのDBへの呼び出し
    2. AWS SNSとSQSのクライアント接続
    3. を作成するには、いくつかのJSONレコードの書式設定を行います。
    4. 私はこの仕事を実行すると、それは

      最初のステージを3つのスパークステージを作成

SNS/SQSにレコードを公開 - それは、ほぼ45秒かかります。明確な 第二段階を実行 - mapToPairとreducebykey = 1.5分

第三段階

を取る=取る19分

私はそうDB原因を打つ参照カサンドラ通話を切っ

  1. 何をしたか

    - これはあまり時間を割いて

  2. 私が見つけた
  3. 問題の一部は、SNS/SQS接続foreachのパーティションにその多くを服用し

を作成することです全体の60%以上

私はforeachPartition内でSNS/SQS Connectionを作成して接続を減らしています。我々は、これらは、シリアライズされないように私は、ドライバに接続オブジェクトを作成できません

も、より良い方法を持っています

私は5gのエグゼキュータ9の数、executoreコア15、ドライバメモリ2G、エグゼキュータのメモリを使用していない

私は16コアを使用しています64ギガのメモリ クラスタサイズ1つのマスター9スレーブすべて同じ設定 EMR展開スパークあなたはノードごとに1つのSNS/SQS接続を設定し、その後にそれを使用したいと思うようですね1.6

+0

あなたは 'AWS SNSとSQSのクライアント接続 ' 60%の仕事の時間を割いたり、 'SNS/SQS'にこれを記録を公開しているを作成しますか?これら2つの間には若干の違いがあります。最初のケースでは、接続の作成回数を最小限に抑える必要があります。一方、2番目のケースでは、データを分散して(さらに多くの接続インスタンスを作成する)必要があります。面白い!!!! – code

+0

それが第2のケースであれば、私は解答を掲示します。 – code

答えて

1

各ノードのすべてのデータを処理します。

ここではforeachPartitionが正しいアイデアだと思っていますが、事前にRDDを統合することをお勧めします。これにより、シャッフルすることなく同じノード上のパーティションが折りたたまれ、余分なSNS/SQS接続の開始を避けることができます。

こちらをご覧ください: https://spark.apache.org/docs/latest/api/scala/index.html#[email protected](numPartitions:Int,shuffle:Boolean,partitionCoalescer:Option[org.apache.spark.rdd.PartitionCoalescer])(implicitord:Ordering[T]):org.apache.spark.rdd.RDD[T]

+0

はい合体は私の解決策です。私がここに追加したいもう一つのポイント。私は23kb、45kbのような多くの小さなファイルを持っていました。そして、それは正しいパーティションに収縮して、今私は20分で25GBに近いプロセスを処理することができます。ここで改善する – Sam

+0

ありがとうBradley ..もう1つ..これは、私が作成しなければならない合体でいくつのパーティションを処理するには1TBのデータが必要だと言われましたか? – Sam

+0

だから私は、それぞれがメモリやコアの数に収まるように十分な大きさのパーティションを使用します。どちらが大きいか。 –