を生成するために、私は、JSONファイルからDataFrame
を生成していますこのように:スパーク - 私は私がスパーク1.6</p> <p>を使用するように強制していますと言って開始したいどのように配列しているデータフレーム内の要素をマージ/結合するには、[行]行
{"id" : "1201", "name" : "satish", "age" : "25"},
{"id" : "1202", "name" : "krishna", "age" : "28"},
{"id" : "1203", "name" : "amith", "age" : "28"},
{"id" : "1204", "name" : "javed", "age" : "23"},
{"id" : "1205", "name" : "mendy", "age" : "25"},
{"id" : "1206", "name" : "rob", "age" : "24"},
{"id" : "1207", "name" : "prudvi", "age" : "23"}
DataFrame
次のようになります。私はこのDataFrame
で何
+---+----+-------+
|age| id| name|
+---+----+-------+
| 25|1201| satish|
| 28|1202|krishna|
| 28|1203| amith|
| 23|1204| javed|
| 25|1205| mendy|
| 24|1206| rob|
| 23|1207| prudvi|
+---+----+-------+
、年齢によってグループにIDおよびフィルタAによるためであります1人以上の学生がいる年齢層。私は次のスクリプトを使用します。
import sqlContext.implicits._
val df = sqlContext.read.json("students.json")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val arrLen = udf {a: Seq[Row] => a.length > 1 }
val mergedDF = df.withColumn("newCol", collect_set(struct("age","id","name")).over(Window.partitionBy("age").orderBy("id"))).select("newCol","age")
val filterd = mergedDF.filter(arrLen(col("newCol")))
をそして今現在の結果は次のとおりです。私は今、欲しい
[WrappedArray([28,1203,amith], [28,1202,krishna]),28]
[WrappedArray([25,1201,satish], [25,1205,mendy]),25]
[WrappedArray([23,1204,javed], [23,1207,prudvi]),23]
は例えばのid
を取って、一つにWrappedArray
の中にあるものを2つの学生の行をマージすることです第1生徒と第2生徒のname
である。
def PrintOne(List : Seq[Row], age : String):Row ={
val studentsDetails = Array(age, List(0).getAs[String]("id"), List(1).getAs[String]("name"))
val mergedStudent= new GenericRowWithSchema(studentsDetails .toArray,List(0).schema)
mergedStudent
}
私は私がテストしたときに、それはそれは期待値を出力するforeachを使用しているため、この機能は、トリックを行う知っている:
filterd.foreach{x => val student = PrintOne(x.getAs[Seq[Row]](0), x.getAs[String]("age"))
println("merged student: "+student)
}
私は次の関数を書いたことを達成するために
出力:
merged student: [28,1203,krishna]
merged student: [23,1204,prudvi]
merged student: [25,1201,mendy]
しかし、私は番目の戻り値を収集するために、マップ内で同じことをしようeの問題が始まります。
私はエンコーダなしで実行した場合:
val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}
私は次の例外を取得:いいえ エンコーダがorg.apache見つかり:スレッド "メイン" java.lang.UnsupportedOperationExceptionがで
例外。 spark.sql.Row - フィールド(クラス: "org.apache.spark.sql.Row"、名称: "_2") - ルートクラス: "scala.Tuple2"
そして、私は自分でEconder
を生成しようとすると、私も失敗:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
implicit val encoder = RowEncoder(filterd.schema)
val merged = filterd.map{row => (row.getAs[String]("age") , PrintOne(row.getAs[Seq[Row]](0), row.getAs[String]("age")))}(encoder)
型の不一致が。見つかった: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder [org.apache.spark.sql.Row] 必須:org.apache.spark.sql.Encoder [(String、 org.apache.spark。 sql.Row)]
正しいエンコーダを提供するにはどうすればよいでしょうか?
私はマップ+カスタム関数の使用を避けるように言われてきましたが、私が適用する必要があるロジックは、各行から1つのフィールドを取り上げるより複雑です。いくつかのフィールドを組み合わせて、行の順序を調べたり、値がnullであるかどうかを調べたりすることが、より多くなります。そして私が知っている限りでは、私はそれを解決することができるカスタム関数を使用しています。
https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-dataset –