私は単一のスパークデータフレームに読み込んで寄木細工に保存する必要がある大きなネストされたNDJ(改行で区切られたJSON)ファイルを用意しました。スキーマをレンダリングするための試みで、私はこの機能を使用します。大規模なJSONファイルをSpark Dataframeに読み込む
val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)
で読み取ることにより、返されたデータフレームに
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName))
}
})
}
を私はまたval df = spark.read.json(path)
にこれを切り替えたので、これだけの作品がその複数行のJSONではなく、同じエラーです。
これは、労働者のメモリ不足エラーの原因です。 java.lang.OutOfMemoryError: Java heap space
私は無駄
にJVMメモリオプションと火花エグゼキュータ/ドライバのオプションを変更してある
は、ファイルをストリームスキーマを平らにし、インクリメンタルデータフレームに追加する方法はありますか? JSONのいくつかの行には、前のエンティティからの新しいフィールドが含まれています。そのため、後で入力する必要があります。
'wholeTextFiles'の結果である' java.lang.OutOfMemoryError'にどう対処していますか? –
私は「ファイルをストリームし、スキーマを平坦化し、データフレームに段階的に追加する方法はありますか?」JSONのいくつかの行には、前のエンティティからの新しいフィールドが含まれています。 "私はメモリの問題の解決に関して何の疑問も見ません。そこで、彼に複数のアプローチを与えました。 – Ramzy
NDJがJSONLの場合、OPはwholeTextFilesを使用するべきではありません。そうでないなら、これは助けにならないでしょう。 –