2017-11-30 7 views
1

現在、sparksqlでscala言語で書かれたパフォーマンスの問題が発生しています。申請の流れは以下の通りです。collectメソッドのSparkSQLパフォーマンスの問題

  1. スパークアプリケーションは、入力からテキストファイルがディレクトリ
  2. プログラムでスキーマを指定使用して、ファイルの先頭にデータフレームを作成しHDFS読み出します。このデータフレームは、メモリに保持された入力ファイルの正確な複製になります。データフレーム

    var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema)

  3. に約18の列を持つことになりますと、手順2の異なるキーワードの助けを借りて、独自の口座番号が含まれています。このデータフレームで構成された第1のデータフレームからのフィルタリングデータフレームを作成します。ステップ2 & 3で構築さ2つのデータフレームを使用して

    var distAccNrsDF = eqpDF.select("accountnumber").distinct().collect()

  4. 、我々は1つの口座番号に属しているすべてのレコードを取得し、フィルタリングされたデータの上にいくつかのJSONの解析ロジックを行います。

    var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect()

  5. 最後にJSONはデータがデータフレームの上に収集メソッドの呼び出し中にここでは、パフォーマンスの問題に直面しているHBaseのテーブルに

を置かれる解析されました。 collectはすべてのデータを単一のノードにフェッチしてから処理を行うため、並列処理の利点が失われるためです。 実際のシナリオでも、私たちが期待できる100億レコードのデータがあります。したがって、ドライバーノードにすべてのレコードを収集すると、メモリーまたはディスクスペースの制限のためにプログラム自体がクラッシュする可能性があります。

私たちのケースでは、一度に限られた数のレコードをフェッチするtakeメソッドは使用できないと思います。データ全体から一意の口座番号をすべて取得する必要があるため、一度に レコードを取得するテイクメソッドが、私たちの要件に合っているかどうかはわかりません。

収集方法を呼び出すのを避け、何か他のベストプラクティスに従う。誰もが直面した同様の問題

があった場合は、コードスニペット/提案/ gitのリンクがある

コードは通常、このような状況で取る

val eqpSchemaString = "acoountnumber ....." 
    val eqpSchema = StructType(eqpSchemaString.split(" ").map(fieldName => 
StructField(fieldName, StringType, true))); 
    val eqpRdd = sc.textFile(inputPath) 
    val eqpRowRdd = eqpRdd.map(_.split(",")).map(eqpRow => Row(eqpRow(0).trim, eqpRow(1).trim, ....) 

    var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema); 


    var distAccNrsDF = eqpDF.select("accountnumber").distinct().collect() 


    distAccNrsDF.foreach { data => 

     var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect() 



     var result = new JSONObject() 

     result.put("jsonSchemaVersion", "1.0") 
     val firstRowAcc = filtrEqpDF(0) 
     //Json parsing logic 
     { 
     ..... 
     ..... 
     } 
    } 
+0

実際に何をしたいですか?ちょうどHbaseテーブルに書き込みますか?それが事実なら、なぜあなたはテイクを収集したいのですか?採取と採取はサンプルデータを見るためだけに使用されます。それ以外には、回収または取る必要はありません。 – Phoenix

+0

基本的には、同じアカウント番号に属するすべてのデータを(ソースファイル内で)グループ化し、グループ化されたデータをhbaseにプッシュする必要があります。グローバルに異なる口座番号を見つけるために、私たちが使っているものを集めてください。収集しないと、複数のノードに広がっているユニークなアカウント番号をグローバルに見つけることができます。 – afzal

答えて

2

アプローチをスニペット非常に参考になります。

  • の代わりにcollectforeachPartitionforeachPartitionは、基礎となるDataFrameの各パーティション(で表される)に個別に機能を適用します(パーティションは、アトミックな並列処理単位関数は、したがって、パーティションごとに作るHBaseの(への接続を開く)と、この接続

を介してすべて含まれている値を送信します

  • スパーク)のこれは、シリアライズされていない(すべてのexecutorが接続を開き意味します機能の境界内に存在するため、ネットワーク経由で送信する必要はなく)、その内容をHBaseに独立して送信するため、ドライバ(またはそのノードに関するすべてのデータ)を収集する必要はありません。

    CSVファイルを読んでいるように見えますので、おそらく次のようなものは、トリックを行います:

    spark.read.csv(inputPath).   // Using DataFrameReader but your way works too 
        foreachPartition { rows => 
        val conn = ???     // Create HBase connection 
        for (row <- rows) {   // Loop over the iterator 
         val data = parseJson(row) // Your parsing logic 
         ???       // Use 'conn' to save 'data' 
        } 
        } 
    
  • 2

    あなたは、データの大規模なセットを持っている場合は、あなたのコード内で収集無視することができます。

    収集ドライバプログラムで、データセットのすべての要素を配列として返します。これは、通常、データの十分に小さなサブセットを返すフィルタやその他の操作の後に便利です。

    また、collect()はRDD/DF全体を単一のマシンにフェッチするため、ドライバのメモリが不足する可能性があります。

    あなたのコードを編集しましたが、これはうまくいくはずです。

     var distAccNrsDF = eqpDF.select("accountnumber").distinct() 
          distAccNrsDF.foreach { data => 
           var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'") 
           var result = new JSONObject() 
           result.put("jsonSchemaVersion", "1.0") 
           val firstRowAcc = filtrEqpDF(0) 
           //Json parsing logic 
           { 
           ..... 
           ..... 
           } 
          }