2017-07-20 13 views
1

私は、JsonファイルをSparkに読み込んだとき、それをParquetとして格納し、次にImpalaからParquetファイルにアクセスしようと試みます。インパラは、SQLで不正な文字が含まれているため、列の名前について不満を持ちます。DataFrameのスキーマを変更する方法(ネストされたフィールドの名前を修正する方法)

JSONファイルの「機能」の1つは、あらかじめ定義されたスキーマを持たないということです。 Sparkにスキーマを作成させてから、不正な文字を含むフィールド名を変更する必要があります。

私の最初の考えはDataFrameのフィールドの名前にwithColumnRenamedを使用することでしたが、これはトップレベルのフィールドでしか動作しないため、Jsonにネストされたデータが含まれているため使用できませんでした。

DataFramesスキーマを再作成するために、次のコードを作成しました。構造を再帰的に使用しました。そして、新しいスキーマを使ってDataFrameを再作成します。

(Scalaのコピーコンストラクタを使用してのヤツェクの提案improvmentで更新されたコード。)この作品

def replaceIllegal(s: String): String = s.replace("-", "_").replace("&", "_").replace("\"", "_").replace("[", "_").replace("[", "_") 
def removeIllegalCharsInColumnNames(schema: StructType): StructType = { 
    StructType(schema.fields.map { field => 
    field.dataType match { 
     case struct: StructType => 
     field.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(struct)) 
     case _ => 
     field.copy(name = replaceIllegal(field.name)) 
    } 
    }) 
} 

sparkSession.createDataFrame(df.rdd, removeIllegalCharsInColumnNames(df.schema)) 

。しかし、私がしたいことを達成するためのより良い/より簡単な方法がありますか?

DataFrameの既存のスキーマを置き換えるより良い方法はありますか?次のコードは動作しませんでした:

df.select($"*".cast(removeIllegalCharsInColumnNames(df.schema))) 

それは、このエラーを与える:

org.apache.spark.sql.AnalysisException: Invalid usage of '*' in expression 'cast' 

答えて

1

私は最善の策は、(あなたが寄木細工のファイルとして保存する前に)データセットを変換することだと思うRDDにし、カスタムスキーマを使用して、必要に応じて構造を記述します。

val targetSchema: StructType = ... 
val fromJson: DataFrame = ... 
val targetDataset = spark.createDataFrame(fromJson.rdd, targetSchema) 

は、あなたがデータセットからそれを作成しようとしている間、しかし、それは直接RDDを使用しています参考にSparkSession.createDataFrameの例を参照してください。

val schema = 
    StructType(
    StructField("name", StringType, false) :: 
    StructField("age", IntegerType, true) :: Nil) 

val people = 
    sc.textFile("examples/src/main/resources/people.txt").map(
    _.split(",")).map(p => Row(p(0), p(1).trim.toInt)) 
val dataFrame = sparkSession.createDataFrame(people, schema) 
dataFrame.printSchema 
// root 
// |-- name: string (nullable = false) 
// |-- age: integer (nullable = true) 

しかし、あなたはあなたのコメントで述べたように(私は後であなたの質問にマージされていること):それと

JSON files don't have a predefined schema.

は、私はあなたのソリューションが正しいものだと思います、と述べました。 Sparkはこれと同じようなものは何も提供していないので、StructType/StructFieldのツリーを走査して間違ったものを変更するカスタムのScalaコードを開発する方法があると思います。

コード内で変更することをお勧めするのは、copyコンストラクタ(Scalaのケースクラスの機能 - A Scala case class ‘copy’ method exampleを参照)を使用して、他のプロパティを変更せずに間違った名前のみを変更することです。

// was 
// case s: StructType => 
// StructField(replaceIllegal(field.name), removeIllegalCharsInColumnNames(s), field.nullable, field.metadata) 
s.copy(name = replaceIllegal(field.name), dataType = removeIllegalCharsInColumnNames(s)) 

は、(特に)とスカラ(一般的に)の関数型言語でいくつかのデザインパターンがある扱うことができます:(略)次のコードに対応するcopyコンストラクタを使用

深いネストされた構造の操作、しかしそれはあまりにも(私はそれを共有することを躊躇して)です。

したがって、必ずしもSparkスキーマではないデータ構造としてツリーを操作する方法についての質問は、現在の「シェイプ」にあると思います。

+0

ありがとう@Jacek - 新しいスキーマを適用する良い方法はありませんか?私はコード "df.select($" * "。cast(removeIllegalCharsInColumnNames(df.schema)))"は動作するはずですが、*を使用することはできません。 DataFrameのルートを選択する方法はありませんか? –

+0

'df.schema'を使って' select( "*" ... ')ではなく新しいスキーマを作成することができます。 –

関連する問題