Spark 2.1.xはこちら。私はそうのような単一のスパークDataset
に読んでいます(同じスキーマを持つ)JSONファイルの束を持っている:レコードを反復する間にSpark Datasetに複数の列を追加する
val ds = spark.read.json("some/path/to/lots/of/json/*.json")
私はその後、ds
スキーマを印刷し、すべてが正しく読み込まれたことを確認できます。
ds.printSchema()
// Outputs:
root
|-- fizz: boolean (nullable = true)
|-- moniker: string (nullable = true)
|-- buzz: string (nullable = true)
|-- foo: string (nullable = true)
|-- bar: string (nullable = true)
文字列はmoniker
ですのでご注意ください。
- このデータセットおよび/またはそのスキーマに3つの新しい列を追加します。 (a)
special_date
と呼ばれる日付/時刻列、(b)special_uuid
と呼ばれるUUID列、および(c)special_phrase
と呼ばれる文字列。次いで - Iは
ds
内のすべてのレコードを反復処理する必要があり、レコードごとに、3つの連続関数にそのmoniker
値を渡し(A)deriveSpecialDate(val moniker : String) : Date
、(b)は、と(C)deriveSpecialPhrase(val moniker : String) : String
。これらの関数のそれぞれの出力は、それぞれの列のレコードの値になる必要があります。
私の最高の試み:
val ds = spark.read.json("some/path/to/lots/of/json/*.json")
ds.foreach(record => {
val moniker : String = record.select("moniker")
val specialDate : Date = deriveSpecialDate(moniker)
val specialUuid : UUID = deriveSpecialUuid(moniker)
val specialPhrase : String = deriveSpecialPhrase(moniker)
// This doesn't work because special_* fields don't exist in the original
// schema dervied from the JSON files. We're ADDING these columns after the
// JSON read and then populating their values dynamically.
record.special_date = specialDate
record.special_uuid = specialUuid
record.special_phrase = specialPhrase
})
これを達成することができますどのように任意のアイデア?それはあなたをもたらすでしょう
ds.withColumn("special_date", deriveSpecialDate(col("moniker)))
.withColumn("special_uuid", deriveSpecialUuid(col("moniker)))
.withColumn("special_phrase", deriveSpecialPhrase (col("moniker)))
:
だから基本的に各関数を呼び出すことによって3つの列を追加したいですか?\ –
こんにちは@ShankarKoirala(+1) - はいまさに! – smeeb