私はセッションと呼ばれるデータフレームを持ち、時間とともに変化する可能性があります。 (を編集して明示します:私は列の大文字小文字のクラスを持っていません - 反映されたスキーマのみ)私は一貫して追跡スコープを構成するかもしれないいくつかの他の内側スコープ列と外側スコープ列を持つ外側スコープのuuidとclientIdを持ちます私は今、これらのパラメータと、まだツーことが解決コードのロジックを含むUDFでoldTrackingEventsとnewTrackingEventsをマージしたい行オブジェクトに埋め込まれた未定義のケースクラスの配列を連結するためのUDF
root
|-- runtimestamp: long (nullable = true)
|-- clientId: long (nullable = true)
|-- uuid: string (nullable = true)
|-- oldTrackingEvents: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- timestamp: long (nullable = true)
| | |-- actionid: integer (nullable = true)
| | |-- actiontype: string (nullable = true)
| | |-- <tbd ... maps, arrays and other stuff matches sibling> section
...
|-- newTrackingEvents: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- timestamp: long (nullable = true)
| | |-- actionid: integer (nullable = true)
| | |-- actiontype: string (nullable = true)
| | |-- <tbd ... maps, arrays and other stuff matches sibling>
...
::のようなので...何か
val mergeTEs = udf((oldTEs : Seq[Row], newTEs : Seq[Row]) =>
// do some stuff - figure best way
// - to merge both groups of tracking events
// - remove duplicates tracker events structures
// - limit total tracking events < 500
return result // same type as UDF input params
)
UDFの戻り結果は連結された2つのfの結果のリストである構造体の配列になりますields。
QUESTION: 私の質問は、このようなUDF構築する方法である - 渡されたパラメータの型が正しい、(2)UDF内でこれらのコレクションを操作する方法と、(3)明確な方法の(1)使用をコンパイラエラーのない値を返す。私は、Seq[Row]
の入出力(val testUDF = udf((trackingEvents : Seq[Row]) => trackingEvents)
)のエラーを受け取り、trackingEvents
の直接返品のエラーjava.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported
を受け取りましたが、trackingEvents
の代わりにSome(1)
を返すエラーは発生しません。 UDFは、コメント欄での活動を使用して上記のスキーマによって提案されたように私は、同一構造の2つのリストを連結することができます目標は、この操作を使用することです:。
sessions.select(mergeTEs('oldTrackingEvents, 'newTrackingEvents).as("cleanTrackingEvents"))
そして、各行の
、...取り戻します'trackingEvents'構造の単一の配列をメモリ/スピード効率のよい方法で実装します。補足:関連性が存在する場合は
私に示した質問を見てみると...この他のポストが有用/関連がある...おそらく...、Defining a UDF that accepts an Array of objects in a Spark DataFrame? ... To create struct function passed to udf has to return Product type (Tuple* or case class), not Row.
を可能にヒントがあります。
おそらく、Select文だけで新しい列のSeq [Elem]の1つの列に2つの列Seq [Elem]を連結する方法がありますか? – codeaperature
はい。 'trackingEvents'(配列 'trackingEvent')の列にはリスト、マップ、構造体があります。これらは変更できるので、ケースクラスをハードコードすることもできません。 – codeaperature