2016-12-09 28 views
5

私は単一のスパークデータフレームに読み込んで寄木細工に保存する必要がある大きなネストされたNDJ(改行で区切られたJSON)ファイルを用意しました。スキーマをレンダリングするための試みで、私はこの機能を使用します。大規模なJSONファイルをSpark Dataframeに読み込む

val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)

で読み取ることにより、返されたデータフレームに

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { 
     schema.fields.flatMap(f => { 
      val colName = if (prefix == null) f.name else (prefix + "." + f.name) 
      f.dataType match { 
      case st: StructType => flattenSchema(st, colName) 
      case _ => Array(col(colName)) 
      } 
     }) 
    } 

を私はまたval df = spark.read.json(path)にこれを切り替えたので、これだけの作品がその複数行のJSONではなく、同じエラーです。

これは、労働者のメモリ不足エラーの原因です。 java.lang.OutOfMemoryError: Java heap space私は無駄

にJVMメモリオプションと火花エグゼキュータ/ドライバのオプションを変更してある

は、ファイルをストリームスキーマを平らにし、インクリメンタルデータフレームに追加する方法はありますか? JSONのいくつかの行には、前のエンティティからの新しいフィールドが含まれています。そのため、後で入力する必要があります。

答えて

0

これは複数の方法で実現できます。

最初に読んでいる間に、jsonを読み込むためにデータフレームのスキーマを提供することができます。または、sparkが単独でスキーマを推論できるようにすることができます。

jsonがデータフレームに入ったら、次の方法でデータを平坦化できます。

a。 dataframeでexplode()を使用すると、それを平坦化できます。 b。 spark sqlを使用して、ネストされたフィールドにアクセスします。オペレーター。例が見つかりますhere

最後に、データフレーム に新しい列を追加する場合は、次のように入力します。最初のオプションは、withColumn()を使用するアプローチの1つです。ただし、これは追加された新しい列とデータセット全体に対して実行されます。 b。既存のものから新しいデータフレームを生成するためにsqlを使用する - これは最も簡単かもしれません c。最後に、マップを使用して要素にアクセスし、古いスキーマを取得し、新しい値を追加し、新しいスキーマを作成し、最終的に新しいdfを取得します。

withColumnはrdd全体で機能します。だから一般的には、追加するすべての列に対してこのメ​​ソッドを使用するのは良い方法ではありません。マップ関数内で列とそのデータを操作する方法があります。 1つのマップ関数がここでジョブを実行しているので、新しい列とそのデータを追加するコードは並行して実行されます。

a。計算に基づいて新しい値を集めることができます

b。ここ

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

行以下のようにメインRDDにこれらの新しい列値を追加し、マップ法

Cの行の基準となります。

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

dのように新しいスキーマを作成します。古いスキーマに追加する

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e。新しい列で新しいデータフレームを作成する

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+0

'wholeTextFiles'の結果である' java.lang.OutOfMemoryError'にどう対処していますか? –

+0

私は「ファイルをストリームし、スキーマを平坦化し、データフレームに段階的に追加する方法はありますか?」JSONのいくつかの行には、前のエンティティからの新しいフィールドが含まれています。 "私はメモリの問題の解決に関して何の疑問も見ません。そこで、彼に複数のアプローチを与えました。 – Ramzy

+0

NDJがJSONLの場合、OPはwholeTextFilesを使用するべきではありません。そうでないなら、これは助けにならないでしょう。 –

2

いいえ回避する。問題は、JVMオブジェクトの制限にあります。私はscala jsonパーサーを使用して終了し、手動でデータフレームを構築しました。

関連する問題