2016-06-27 28 views
3

Array[org.apache.spark.sql.Row]DataFrameに変換したいとします。 誰かが私に良い方法を提案できますか?Spark/Scalaで配列[行]をDataFrameに変換する

最初にRDDに変換してからDataframeに変換しようとしましたが、DataFrameの操作を実行すると例外が表示されます。

ここ
val arrayOfRows = myDataFrame.collect().map(t => myfun(t)) 
val distDataRDD = sc.parallelize(arrayOfRows) 
val newDataframe = sqlContext.createDataFrame(distDataRDD,myschema) 

myfun()Row (org.apache.spark.sql.Row)を返す関数です。 配列の内容が正しいため、問題なく印刷できます。

しかし、私がRDDのレコードを数えようとしたとき、それは私にカウントと、ステージの1つに非常に大きなサイズのタスクが含まれているという警告を与えました。私は何か間違っていると思います。助けてください。

答えて

1

最初の行にバグがあります。 collectはArrayを返し、mapはDataFrames/RDDで動作するメソッドです。

代わりにval arrayOfRows = myDataFrame.map(t => myfun(t)).collect()を試してください。

+0

注文を変更するとこのエラーが発生します。** org.apache.spark .SparkException:タスクがシリアライズできない** – rvp

+0

arrayofRowsは実際にDataFrameタイプであるため、2行目と3行目は必要ありません(sc.parallelizeはRDDを受け取り、DataFramesは受け付けません)。 –

+0

エラーが発生します。私が最初の行に入るとすぐ 'val arrayOfRows = myD ataFrame.collect()。map(t => myfun(t)) ' – rvp

0

ケースクラスPgRnk(のuserId:ロング、ページランク:ダブル)//がケースクラス

sc.parallelize(pg10.map(R1 => PgRnk(r1.getLong(0)、r1.getDoubleを(作成toDS()//データセットに変換すると、sc.parallelizeは配列をRDDに変換してからDSに変換します

関連する問題