2016-12-20 7 views
3

Spark Scalaスクリプトで少し問題が発生しました。基本的には、グループ化やカウントなどの後に集計を行う生データがあります。出力を特定のJSON形式に保存する必要があります。ケースクラスとカラム名エイリアスでリフレクションを使用したSparkデータフレームスキーマ定義

EDIT:

私は質問を簡素化しようとし、それを書き直し:

私は列名が別名を持っているArray[org.apache.spark.sql.Column]とソースデータフレームからのデータを選択すると、その列名を使用して(または実際にはインデックス)を変数として使用して、行をクラスにマップしようとすると、「タスクは直列化できません」という例外が発生します。

var dm = sqlContext.createDataFrame(Seq((1,"James"),(2,"Anna"))).toDF("id", "name") 

val cl = dm.columns 
val cl2 = cl.map(name => col(name).as(name.capitalize)) 
val dm2 = dm.select(cl2:_*) 
val n = "Name" 
case class Result(Name:String) 
val r = dm2.map(row => Result(row.getAs(n))).toDF 

そして第二部や質問、私は実際にこれらのResultクラスオブジェクトの配列であることを、最終的なスキーマを必要としています。私はまだ分かっていない、どのようにこれを行うにも。

case class Test(var FilteredStatistics: Array[Result]) 
    val t = Test(Array(Result("Anna"), Result("James"))) 

    val t2 = sc.parallelize(Seq(t)).toDF 

    scala> t2.printSchema 
    root 
    |-- FilteredStatistics: array (nullable = true) 
    | |-- element: struct (containsNull = true) 
    | | |-- Name: string (nullable = true) 

TL; DR:期待される結果はそのようなスキーマを持つべきデータフレームの列が別名を持っていると変数が使用されたときにケースのクラスオブジェクトへのデータフレームの列をマッピングする方法

  1. 列名?

  2. これらのケースクラスオブジェクトを配列に追加するにはどうすればよいですか?

+0

シリアル化の問題は再現されません - 私はあなたのコードをすべてコピーしてくれました。あなたのコードのどこかのように見えます(ここには貼り付けられていません)。DataFrameで使用されるケースクラス内で、または直列化されてworkerに送信される変換で 'org.apache.spark.sql.Column'オブジェクトを使用しています... –

+2

ところで、私たちのうちの1人は、おそらくこの非常に丁寧な質問の細部に迷っているでしょう。それを_minimize_しよう。問題を再現する最も単純な例を見つけてください(同様の最小化の後に別の質問を別途お願いします) –

+0

シリアライゼーションの問題を解決するための修正。 'class Result(???)extends Serializable;オブジェクト結果{def適用(r:行):結果= r一致{??? }} '次に、あなたのDFにあるかもしれない様々なフォーマットに対処するために、rにパターンマッチングを使用します。行の一部にクラスを適用しようとすると問題になることがよくありますが、行全体をマップできるクラスを作成すると、 'DF.map(Result)'が機能することがあります。タスクがシリアル化可能ではない問題を理解するためのヘルプについては、 – kmh

答えて

0

シリアライゼーション問題:ここでの問題はval n = "Name"である:それは、その変数それを含む範囲にわたって密接スパークせるRDD変換(dm2.map(...))に渡された匿名関数、内部で使用され、 cl2も含まれ、タイプはArray[Column]であるため、シリアル化できません。

解決方法は簡単です - インラインndm2.map(row => Result(row.getAs("Name"))))を取得するか、シリアライズ可能なコンテキスト(非シリアル化可能なメンバーを含まないオブジェクトまたはクラス)に配置します。

+0

さて、私はまだいくつかの困難を抱えています。列名を1つの場所に宣言してコード全体で使用すると、インラインで行う必要はありません。私は複数の場所でそれらを変更しなければならないので、私はそれらを複製したくない。しかし、必要な変数を保持するためのオブジェクトまたはクラスを作成する必要があることを意味しましたか? 'Params.name'をgetAsの内部で使用することを想定していました。助けてくれなかった。申し訳ありませんが、私は何かを見逃した場合 –

+0

オブジェクトが他のクラスにネストされていない場合 - それは動作するはずです(私のために働く...)。 * spark-shell *でオブジェクトを作成すると、シェルコード全体をカプセル化するオブジェクトに実際にネストされているので、そこでは機能しない可能性があります(私は思う) –

関連する問題