2017-11-02 8 views
1

このスカラーコードは、Spark DataFrameの各行に対して何かを行うために書きました。基本的にこれは私が行うステップですループの出力をデータフレームに書き込む

1. I convert the DataFrame into an array 
2. Iterate through the array and perform calculations and get the output in an array 
3. convert the output of the array to a dataframe and then make a Hive table. 

私は100万レコードを実行するときに問題があります。とにかくパフォーマンスを高めることができますか? AFAIKのsparkデータフレームは反復できないため、データフレームを配列に変換するだけです。

def getRows (ca : org.apache.spark.sql.DataFrame) = 
{ 
    val allca = List() 
    for (a <- ca.collect()) yield 
    { 
    val newAddress = a.getString(1) 
    val output = newAddress :: getRecursiveList(newAddress).reverse 


    val dataset = 
CA (account.getInt(0), 
      account.getString(1), 
      account.getString(2), 
      output.toString) 

    dataset :: allca 
    } 
} 

val myArray = getRows(customerAccounts) 

val OutputDataFrame = sc.parallelize(myArray.flatMap(x => x)).toDF 

OutputDataFrame.show() 


val resultsRDD = OutputDataFrame.registerTempTable("history") 

spark.sql(""" insert into user_tech.history select * from history """).collect.foreach(println) 
+0

については、以下のリンクに従ってください? –

+0

良い点、私はそれを行うことができます。私はこれが可能かどうかを知りたかっただけです – Srinivas

+0

あなたも使うことができるならば、inbuilt関数をスパークすることも見てくださいhttps://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.htmlそれら。彼らはあなたにudfよりも良いパフォーマンスを与えるべきです。しかし、あなたのロジックを関数の1つで行うことができない場合は、udf関数を使う必要があります。 –

答えて

0

いくつかの基本を理解してください:

  1. スパークのScala/JavaのAPIは非常に高いレベルの視点を提供し、データ構造の分散性の任意のアイデアを提供していません。
  2. データフレームを反復するには、2つの方法があります。1つのマシン上ですべてのデータを分散して反復処理してから反復処理するかの2つの選択肢があります。
  3. ca.collect()は、すべてのノードからデータフレームからデータを収集し、スケーラブルな解決策ではないデータを処理するドライバに取得しています。

    は、あなただけのUDFを書くことができないよりよく理解

    1. http://bytepadding.com/big-data/spark/spark-code-analysis/
    2. http://bytepadding.com/big-data/spark/understanding-spark-through-map-reduce/
関連する問題