2017-11-13 13 views
0

私はセッションと呼ばれるデータフレームを持ち、時間とともに変化する可能性があります。 (を編集して明示します:私は列の大文字小文字のクラスを持っていません - 反映されたスキーマのみ)私は一貫して追跡スコープを構成するかもしれないいくつかの他の内側スコープ列と外側スコープ列を持つ外側スコープのuuidとclientIdを持ちます私は今、これらのパラメータと、まだツーことが解決コードのロジックを含むUDFでoldTrackingEventsとnewTrackingEventsをマージしたい行オブジェクトに埋め込まれた未定義のケースクラスの配列を連結するためのUDF

root 
|-- runtimestamp: long (nullable = true) 
|-- clientId: long (nullable = true) 
|-- uuid: string (nullable = true) 
|-- oldTrackingEvents: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- timestamp: long (nullable = true) 
| | |-- actionid: integer (nullable = true) 
| | |-- actiontype: string (nullable = true) 
| | |-- <tbd ... maps, arrays and other stuff matches sibling> section 
... 
|-- newTrackingEvents: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- timestamp: long (nullable = true) 
| | |-- actionid: integer (nullable = true) 
| | |-- actiontype: string (nullable = true) 
| | |-- <tbd ... maps, arrays and other stuff matches sibling>  
... 

::のようなので...何か

val mergeTEs = udf((oldTEs : Seq[Row], newTEs : Seq[Row]) => 
     // do some stuff - figure best way 
     // - to merge both groups of tracking events 
     // - remove duplicates tracker events structures 
     // - limit total tracking events < 500 
     return result // same type as UDF input params 
    ) 

UDFの戻り結果は連結された2つのfの結果のリストである構造体の配列になりますields。

QUESTION: 私の質問は、このようなUDF構築する方法である - 渡されたパラメータの型が正しい、(2)UDF内でこれらのコレクションを操作する方法と、(3)明確な方法の(1)使用をコンパイラエラーのない値を返す。私は、Seq[Row]の入出力(val testUDF = udf((trackingEvents : Seq[Row]) => trackingEvents))のエラーを受け取り、trackingEventsの直接返品のエラーjava.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supportedを受け取りましたが、trackingEventsの代わりにSome(1)を返すエラーは発生しません。 UDFは、コメント欄での活動を使用して上記のスキーマによって提案されたように私は、同一構造の2つのリストを連結することができます目標は、この操作を使用することです:。

sessions.select(mergeTEs('oldTrackingEvents, 'newTrackingEvents).as("cleanTrackingEvents")) 

そして、各行の

、...取り戻します'trackingEvents'構造の単一の配列をメモリ/スピード効率のよい方法で実装します。

補足:関連性が存在する場合は

私に示した質問を見てみると...この他のポストが有用/関連がある...おそらく...、Defining a UDF that accepts an Array of objects in a Spark DataFrame? ... To create struct function passed to udf has to return Product type (Tuple* or case class), not Row. を可能にヒントがあります。

+0

おそらく、Select文だけで新しい列のSeq [Elem]の1つの列に2つの列Seq [Elem]を連結する方法がありますか? – codeaperature

+0

はい。 'trackingEvents'(配列 'trackingEvent')の列にはリスト、マップ、構造体があります。これらは変更できるので、ケースクラスをハードコードすることもできません。 – codeaperature

答えて

1

私はquestion you've linkedがそれをすべて説明していると思いますので、ちょうど繰り返してください。 udfで作業する場合:StructTypeため

  • 入力表現が弱いオブジェクトを入力されます。
  • StructTypeの出力タイプはScala Productである必要があります。 オブジェクトを返すことはできません。

これはかなりの負担にある場合、あなたは強くTSessionスキーマを表す代数的データ型で、Uは結果を表す代数的データ型であるDataset

val f: T => U 
sessions.as[T].map(f): Dataset[U] 

を入力使用する必要があります。

+0

あなたが何を意味しているのか理解し始めています。私は問題は私が '代数的データ型 'を持っていないと思う...私は' sessions.select(' oldTrackingEvents).schema'(これは柔軟な型である)によって推論されたスキーマしか持たず、私はこれを考えているスキーマをscala.productに変換する必要があります。 BTW - 私はまた、スパーク1.6を使用することを余儀なくされています。 – codeaperature

0

...

マスターデータフレームから、各trackingEventsセクションのデータフレームを作成する場合は、newと、それ以外の場合は、データフレームを作成してください。 old。それぞれで、展開された 'trackingEvents'セクションの列を選択します。これらのvalデータフレーム宣言をnewTEoldTEとして保存します。

各々はuuidclientIdとイベントtimestampだとして取り出される列はoldTrackingEventsなどnewTrackingEventsのアレイにおける各追跡イベントに固有の他のデータフレームを作成します。あなたの擬似スキーマは次のようになります。

(uuid: String, clientId : Long, newTE : Seq[Long], oldTE : Seq[Long])

はあなたの構造の2つの単純なシーケンスを結合するためにUDFを使用して、例えば、「未テストのようなもの」であり、両方のSeq[Long]

val limitEventsUDF = udf { (newTE: Seq[Long], oldTE: Seq[Long], limit: Int, tooOld: Long) => { 
    (newTE ++ oldTE).filter(_ > tooOld).sortWith(_ > _).distinct.take(limit) 
}} 

UDF掃除された追跡イベントのデータフレームを返す&あなたは、撤去されたイベントを持つ非常にスリムなデータフレームを持っており、お互いに結合された後、爆発したnewTEoldTEフレームに自己参加します。

必要に応じてcollect_listを使用してGroupByします。

まだ...これはたくさんの作業のようです - これはこのために投票する必要があります"the answer" - 私はわかりません?

関連する問題