Spark Scalaスクリプトで少し問題が発生しました。基本的には、グループ化やカウントなどの後に集計を行う生データがあります。出力を特定のJSON形式に保存する必要があります。ケースクラスとカラム名エイリアスでリフレクションを使用したSparkデータフレームスキーマ定義
EDIT:
私は質問を簡素化しようとし、それを書き直し:
私は列名が別名を持っているArray[org.apache.spark.sql.Column]
とソースデータフレームからのデータを選択すると、その列名を使用して(または実際にはインデックス)を変数として使用して、行をクラスにマップしようとすると、「タスクは直列化できません」という例外が発生します。
var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name")
val cl = dm.columns
val cl2 = cl.map(name => col(name).as(name.capitalize))
val dm2 = dm.select(cl2:_*)
val n = "Name"
case class Result(Name:String)
val r = dm2.map(row => Result(row.getAs(n))).toDF
そして第二部や質問、私は実際にこれらのResult
クラスオブジェクトの配列であることを、最終的なスキーマを必要としています。私はまだ分かっていない、どのようにこれを行うにも。
case class Test(var FilteredStatistics: Array[Result])
val t = Test(Array(Result("Anna"), Result("James")))
val t2 = sc.parallelize(Seq(t)).toDF
scala> t2.printSchema
root
|-- FilteredStatistics: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
TL; DR:期待される結果はそのようなスキーマを持つべきデータフレームの列が別名を持っていると変数が使用されたときにケースのクラスオブジェクトへのデータフレームの列をマッピングする方法
列名?
これらのケースクラスオブジェクトを配列に追加するにはどうすればよいですか?
シリアル化の問題は再現されません - 私はあなたのコードをすべてコピーしてくれました。あなたのコードのどこかのように見えます(ここには貼り付けられていません)。DataFrameで使用されるケースクラス内で、または直列化されてworkerに送信される変換で 'org.apache.spark.sql.Column'オブジェクトを使用しています... –
ところで、私たちのうちの1人は、おそらくこの非常に丁寧な質問の細部に迷っているでしょう。それを_minimize_しよう。問題を再現する最も単純な例を見つけてください(同様の最小化の後に別の質問を別途お願いします) –
シリアライゼーションの問題を解決するための修正。 'class Result(???)extends Serializable;オブジェクト結果{def適用(r:行):結果= r一致{??? }} '次に、あなたのDFにあるかもしれない様々なフォーマットに対処するために、rにパターンマッチングを使用します。行の一部にクラスを適用しようとすると問題になることがよくありますが、行全体をマップできるクラスを作成すると、 'DF.map(Result)'が機能することがあります。タスクがシリアル化可能ではない問題を理解するためのヘルプについては、 – kmh