2017-07-27 6 views
0

Spark 2.1.xはこちら。私はそうのような単一のスパークDatasetに読んでいます(同じスキーマを持つ)JSONファイルの束を持っている:レコードを反復する間にSpark Datasetに複数の列を追加する

val ds = spark.read.json("some/path/to/lots/of/json/*.json") 

私はその後、dsスキーマを印刷し、すべてが正しく読み込まれたことを確認できます。

ds.printSchema() 

// Outputs: 
root 
|-- fizz: boolean (nullable = true) 
|-- moniker: string (nullable = true) 
|-- buzz: string (nullable = true) 
|-- foo: string (nullable = true) 
|-- bar: string (nullable = true) 

文字列はmonikerですのでご注意ください。

  1. このデータセットおよび/またはそのスキーマに3つの新しい列を追加します。 (a)special_dateと呼ばれる日付/時刻列、(b)special_uuidと呼ばれるUUID列、および(c)special_phraseと呼ばれる文字列。次いで
  2. Iはds内のすべてのレコードを反復処理する必要があり、レコードごとに、3つの連続関数にそのmoniker値を渡し(A)deriveSpecialDate(val moniker : String) : Date、(b)は、​​と(C)deriveSpecialPhrase(val moniker : String) : String。これらの関数のそれぞれの出力は、それぞれの列のレコードの値になる必要があります。

私の最高の試み:

val ds = spark.read.json("some/path/to/lots/of/json/*.json") 

ds.foreach(record => { 
    val moniker : String = record.select("moniker") 
    val specialDate : Date = deriveSpecialDate(moniker) 
    val specialUuid : UUID = deriveSpecialUuid(moniker) 
    val specialPhrase : String = deriveSpecialPhrase(moniker) 

    // This doesn't work because special_* fields don't exist in the original 
    // schema dervied from the JSON files. We're ADDING these columns after the 
    // JSON read and then populating their values dynamically. 
    record.special_date = specialDate 
    record.special_uuid = specialUuid 
    record.special_phrase = specialPhrase 
}) 

これを達成することができますどのように任意のアイデア?それはあなたをもたらすでしょう

ds.withColumn("special_date", deriveSpecialDate(col("moniker))) 
.withColumn("special_uuid", deriveSpecialUuid(col("moniker))) 
.withColumn("special_phrase", deriveSpecialPhrase (col("moniker))) 

+1

だから基本的に各関数を呼び出すことによって3つの列を追加したいですか?\ –

+0

こんにちは@ShankarKoirala(+1) - はいまさに! – smeeb

答えて

1

私はあなたがこのような何かを行うことができ、スパークその後

val deriveSpecialDate = udf((moniker: String) => // implement here) 
val deriveSpecialUuid= udf((moniker: String) => // implement here) 
val deriveSpecialPhrase = udf((moniker: String) => // implement here) 

からUDF(ユーザー定義関数)を使用して、3列の元のデータセットを高めるであろう3つの列を持つ新しいデータフレーム必要に応じて、マップ関数を使用してデータセットに変換することもできます

+0

ありがとう@dumitru(+1) - このソリューション( 'ds.withColumn ...')もデータセットの元の列を保持しますか?元のデータセットに5つの列があり、3つ追加したい(合計8つ)。それとも、3つの列しか持たないようにスキーマを変更する(元の5を削除する)のですか? – smeeb

+0

また、列を保持するために、明示的にdropを呼び出して特定の列を削除する必要があります – dumitru

0

withColumnを使用すると、新しい列を作成できます。すでに機能を持っている場合と、あなたは

val sd = sqlContext.udf.register("deriveSpecialDate",deriveSpecialDate _) 
val su = sqlContext.udf.register("deriveSpecialUuid",deriveSpecialUuid _) 
val sp = sqlContext.udf.register("deriveSpecialPhrase", deriveSpecialPhrase _) 

あなたが必要とするこのUDFを使用するには、どのwithcolumnで

ds.withColumn("special_date", sd($"moniker)) 
.withColumn("special_uuid", su($"moniker)) 
.withColumn("special_phrase", sp($"moniker)) 

として新しい列を作成し、UDF(ユーザー定義関数)として、その関数を登録する必要がありますこれで3つの新しい追加された列で元のデータセットを取得できます。

関連する問題