2017-02-07 5 views
1

100万行のデータフレームの各行に1000を生成したい1000個のエントリを有するリストとのクロス積を取ることによってそれの各行から10個の行を生成し、それによって10億行のデータフレームを生成する。それを効率的に行うための最良の方法は何ですか? 私は、リストをブロードキャストし、それを使ってデータフレームの各行をマッピングしようとしました。しかし、これは時間がかかり過ぎているようです。あなたが行うことができますあなたのスパークバージョンによって10億行の新しいデータフレームを生成するために、リスト(1000エントリ)を持つSparkデータフレーム(100万行)デカルト積を効率的に実行する方法

val mappedrdd = validationDataFrames.map(x => { 
    val cutoffList : List[String] = cutoffListBroadcast.value 
    val arrayTruthTableVal = arrayTruthTableBroadcast.value 

    var listBufferRow: ListBuffer[Row] = new ListBuffer() 

     for(cutOff <- cutoffList){ 
      val conversion = x.get(0).asInstanceOf[Int] 
      val probability = x.get(1).asInstanceOf[Double] 

      var columnName : StringBuffer = new StringBuffer 
      columnName = columnName.append(conversion) 

      if(probability > cutOff.toDouble){ 
       columnName = columnName.append("_").append("1") 
      }else{ 
       columnName = columnName.append("_").append("0") 
      } 
      val index:Int = arrayTruthTableVal.indexOf(columnName.toString) 
      var listBuffer : ListBuffer[String] = new ListBuffer() 
      listBuffer :+= cutOff 

      for(i <- 1 to 4){ 
      if((index + 1) == i) listBuffer :+= "1" else listBuffer :+= "0" 
      } 
      val row = Row.fromSeq(listBuffer) 
      listBufferRow = listBufferRow :+ row 
     } 

     listBufferRow 

    }) 
+0

あなたがデータセットを使用して検討している、上記と同じサーバー上にかかりましたか?あなたは2つのテーブルに対してクロスジョインを適用することができます。 –

+0

"これは時間がかかりすぎているようです。"どのくらいの時間がかかりますか、どれくらいかかると思いますか? 10億行の生産は高価になるでしょう。 –

+0

実際それは決して完了しません。 listBufferRow = listBufferRow:+ row エグゼキュータのスレッドダンプを取ると、listBufferRow = listBufferRow:+ rowでスタックするスレッドが表示されます。 比較的小さなDFでも同じことがうまくいきます。 – arpit

答えて

2

スパーク2.1.0

は列としてリストを追加し、爆発します。簡単な例:あなたがすることができるタイミングについては

val df = spark.range(5) 
val exploded = df.withColumn("a",lit(List(1,2,3).toArray)).withColumn("a", explode($"a")) 
df.show() 

+---+---+ 
| id| a| 
+---+---+ 
| 0| 1| 
| 0| 2| 
| 0| 3| 
| 1| 1| 
| 1| 2| 
| 1| 3| 
| 2| 1| 
| 2| 2| 
| 2| 3| 
| 3| 1| 
| 3| 2| 
| 3| 3| 
| 4| 1| 
| 4| 2| 
| 4| 3| 
+---+---+ 

def time[R](block: => R): Long = { 
    val t0 = System.currentTimeMillis() 
    block // call-by-name 
    val t1 = System.currentTimeMillis() 
    t1 - t0 
} 
    time(spark.range(1000000).withColumn("a",lit((0 until 1000).toArray)).withColumn("a", explode($"a")).count()) 

は60

<スパークのデフォルトの並列度で構成されたメモリをたっぷりと16コアのコンピュータ上で5.41秒かかりました2.1.0

単純なUDFを定義できます。

val xx = (0 until 1000).toArray.toSeq // replace with your list but turn it to seq 
    val ff = udf(() => {xx}) 
    time(spark.range(1000000).withColumn("a",ff()).withColumn("a", explode($"a")).count()) 

は8.25秒

関連する問題