私は、JsonファイルをSparkに読み込んだとき、それをParquetとして格納し、次にImpalaからParquetファイルにアクセスしようと試みます。インパラは、SQLで不正な文字が含まれているため、列の名前について不満を持ちます。DataFrameのスキーマを変更する方法(ネストされたフィールドの名前を修正する方法)
JSONファイルの「機能」の1つは、あらかじめ定義されたスキーマを持たないということです。 Sparkにスキーマを作成させてから、不正な文字を含むフィールド名を変更する必要があります。
私の最初の考えはDataFrameのフィールドの名前にwithColumnRenamed
を使用することでしたが、これはトップレベルのフィールドでしか動作しないため、Jsonにネストされたデータが含まれているため使用できませんでした。
DataFramesスキーマを再作成するために、次のコードを作成しました。構造を再帰的に使用しました。そして、新しいスキーマを使ってDataFrameを再作成します。
(Scalaのコピーコンストラクタを使用してのヤツェクの提案improvmentで更新されたコード。)この作品
def replaceIllegal(s: String): String = s.replace("-", "_").replace("&", "_").replace("\"", "_").replace("[", "_").replace("[", "_")
def removeIllegalCharsInColumnNames(schema: StructType): StructType = {
StructType(schema.fields.map { field =>
field.dataType match {
case struct: StructType =>
field.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(struct))
case _ =>
field.copy(name = replaceIllegal(field.name))
}
})
}
sparkSession.createDataFrame(df.rdd, removeIllegalCharsInColumnNames(df.schema))
。しかし、私がしたいことを達成するためのより良い/より簡単な方法がありますか?
DataFrameの既存のスキーマを置き換えるより良い方法はありますか?次のコードは動作しませんでした:
df.select($"*".cast(removeIllegalCharsInColumnNames(df.schema)))
それは、このエラーを与える:
org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'cast'
ありがとう@Jacek - 新しいスキーマを適用する良い方法はありませんか?私はコード "df.select($" * "。cast(removeIllegalCharsInColumnNames(df.schema)))"は動作するはずですが、*を使用することはできません。 DataFrameのルートを選択する方法はありませんか? –
'df.schema'を使って' select( "*" ... ')ではなく新しいスキーマを作成することができます。 –