2017-10-02 2 views
1

を生成するために、私は、JSONファイルからDataFrameを生成していますこのように:スパーク - 私は私がスパーク1.6</p> <p>を使用するように強制していますと言って開始したいどのように配列しているデータフレーム内の要素をマージ/結合するには、[行]行

{"id" : "1201", "name" : "satish", "age" : "25"}, 
{"id" : "1202", "name" : "krishna", "age" : "28"}, 
{"id" : "1203", "name" : "amith", "age" : "28"}, 
{"id" : "1204", "name" : "javed", "age" : "23"}, 
{"id" : "1205", "name" : "mendy", "age" : "25"}, 
{"id" : "1206", "name" : "rob", "age" : "24"}, 
{"id" : "1207", "name" : "prudvi", "age" : "23"} 

DataFrame次のようになります。私はこのDataFrameで何

+---+----+-------+ 
|age| id| name| 
+---+----+-------+ 
| 25|1201| satish| 
| 28|1202|krishna| 
| 28|1203| amith| 
| 23|1204| javed| 
| 25|1205| mendy| 
| 24|1206| rob| 
| 23|1207| prudvi| 
+---+----+-------+ 

、年齢によってグループにIDおよびフィルタAによるためであります1人以上の学生がいる年齢層。私は次のスクリプトを使用します。

import sqlContext.implicits._ 

val df = sqlContext.read.json("students.json") 

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions._ 

val arrLen = udf {a: Seq[Row] => a.length > 1 } 

val mergedDF = df.withColumn("newCol", collect_set(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("newCol","age") 

val filterd = mergedDF.filter(arrLen(col("newCol"))) 

をそして今現在の結果は次のとおりです。私は今、欲しい

[WrappedArray([28,1203,amith], [28,1202,krishna]),28] 
[WrappedArray([25,1201,satish], [25,1205,mendy]),25] 
[WrappedArray([23,1204,javed], [23,1207,prudvi]),23] 

は例えばのidを取って、一つにWrappedArrayの中にあるものを2つの学生の行をマージすることです第1生徒と第2生徒のnameである。

def PrintOne(List : Seq[Row], age : String):Row ={ 
     val studentsDetails = Array(age, List(0).getAs[String]("id"), List(1).getAs[String]("name")) 
     val mergedStudent= new GenericRowWithSchema(studentsDetails .toArray,List(0).schema) 

     mergedStudent 
    } 

私は私がテストしたときに、それはそれは期待値を出力するforeachを使用しているため、この機能は、トリックを行う知っている:

filterd.foreach{x => val student = PrintOne(x.getAs[Seq[Row]](0), x.getAs[String]("age")) 
         println("merged student: "+student) 
        } 

私は次の関数を書いたことを達成するために

出力:

merged student: [28,1203,krishna] 
merged student: [23,1204,prudvi] 
merged student: [25,1201,mendy] 

しかし、私は番目の戻り値を収集するために、マップ内で同じことをしようeの問題が始まります。

私はエンコーダなしで実行した場合:

val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))} 

私は次の例外を取得:いいえ エンコーダがorg.apache見つかり:スレッド "メイン" java.lang.UnsupportedOperationExceptionがで

例外。 spark.sql.Row - フィールド(クラス: "org.apache.spark.sql.Row"、名称: "_2") - ルートクラス: "scala.Tuple2"

そして、私は自分でEconderを生成しようとすると、私も失敗:

import org.apache.spark.sql.catalyst.encoders.RowEncoder 
    implicit val encoder = RowEncoder(filterd.schema) 

    val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}(encoder) 

型の不一致が。見つかった: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder [org.apache.spark.sql.Row] 必須:org.apache.spark.sql.Encoder [(String、 org.apache.spark。 sql.Row)]

正しいエンコーダを提供するにはどうすればよいでしょうか?

私はマップ+カスタム関数の使用を避けるように言われてきましたが、私が適用する必要があるロジックは、各行から1つのフィールドを取り上げるより複雑です。いくつかのフィールドを組み合わせて、行の順序を調べたり、値がnullであるかどうかを調べたりすることが、より多くなります。そして私が知っている限りでは、私はそれを解決することができるカスタム関数を使用しています。

+0

https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset –

答えて

2

mapからの出力は(String, Row)です。したがって、RowEncoderだけを使用してエンコードすることはできません。

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.{Encoder, Encoders} 
import org.apache.spark.sql.catalyst.encoders.RowEncoder 

val encoder = Encoders.tuple(
    Encoders.STRING, 
    RowEncoder(
    // The same as df.schema in your case 
    StructType(Seq(
     StructField("age", StringType), 
     StructField("id", StringType), 
     StructField("name", StringType))))) 

filterd.map{row => (
    row.getAs[String]("age"), 
    PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age"))) 
}(encoder) 

このアプローチは全体的にアンチパターンのように見えます。あなたはより多くの機能的なスタイルを使用したい場合は、Dataset[Row]を回避する必要があります。

case class Person(age: String, id: String, name: String) 

filterd.as[(Seq[Person], String)].map { 
    case (people, age) => (age, (age, people(0).id, people(1).name)) 
} 

またはudf

GenericRowWithSchemaを含むo.a.s.sql.catalystパッケージは、主に社内での使用を目的としています。他に必要がなければ、o.a.s.sql.Rowを好む。

関連する問題