1
このスカラーコードは、Spark DataFrameの各行に対して何かを行うために書きました。基本的にこれは私が行うステップですループの出力をデータフレームに書き込む
1. I convert the DataFrame into an array
2. Iterate through the array and perform calculations and get the output in an array
3. convert the output of the array to a dataframe and then make a Hive table.
私は100万レコードを実行するときに問題があります。とにかくパフォーマンスを高めることができますか? AFAIKのsparkデータフレームは反復できないため、データフレームを配列に変換するだけです。
def getRows (ca : org.apache.spark.sql.DataFrame) =
{
val allca = List()
for (a <- ca.collect()) yield
{
val newAddress = a.getString(1)
val output = newAddress :: getRecursiveList(newAddress).reverse
val dataset =
CA (account.getInt(0),
account.getString(1),
account.getString(2),
output.toString)
dataset :: allca
}
}
val myArray = getRows(customerAccounts)
val OutputDataFrame = sc.parallelize(myArray.flatMap(x => x)).toDF
OutputDataFrame.show()
val resultsRDD = OutputDataFrame.registerTempTable("history")
spark.sql(""" insert into user_tech.history select * from history """).collect.foreach(println)
については、以下のリンクに従ってください? –
良い点、私はそれを行うことができます。私はこれが可能かどうかを知りたかっただけです – Srinivas
あなたも使うことができるならば、inbuilt関数をスパークすることも見てくださいhttps://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.htmlそれら。彼らはあなたにudfよりも良いパフォーマンスを与えるべきです。しかし、あなたのロジックを関数の1つで行うことができない場合は、udf関数を使う必要があります。 –