2017-12-10 15 views
-1

現在11,000ファイルを処理中です。各ファイルは、前のものとの連合となるデータフレームを生成します。以下はコードです大きなデータフレームをスパークで効果的にキャッシュする

var df1 = sc.parallelize(Array(("temp",100))).toDF("key","value").withColumn("Filename", lit("Temp"))  
files.foreach(filename => { 
      val a = filename.getPath.toString() 
      val m = a.split("/") 
      val name = m(6) 
      println("FILENAME: " + name)     
      if (name == "_SUCCESS") { 
       println("Cannot Process '_SUCCSS' Filename") 
      } else { 
       val freqs=doSomething(a).toDF("key","value").withColumn("Filename", lit(name)) 
       df1=df1.unionAll(freqs) 
      } 

}) 

最初に、私は11000ファイルにjava.lang.StackOverFlowErrorのエラーがあります。その後、私はdf1=df1.unionAll(freqs)後に次の行を追加します。

df1=df1.cache() 

それは問題を解決しますが、各反復の後に、それが遅くなっています。誰かが時間の短縮なしでStackOverflowErrorを避けるために何をすべきか教えてください。 ありがとう!

+0

系統が非常に深くなって非効率的になる。あなたは、(スパーク2の)系統を切り詰めるために「チェックポイント」を試みることができます。別の方法として、すべての個別のデータフレームをディスクに書き込むこともできます(ファイル名でパーティション化されたテーブルなど)。 –

答えて

0

スパークは、データフレームを一連の変換として管理することが問題です。最初のデータフレームの "toDF"で始まり、その上で変換を実行します(たとえばwithColumn)。その後、以前のデータフレームなどでunionAllを実行します。

unionAllは単なる変換であり、ツリーは非常に長くなります11Kユニオンあなたは深さ11Kの実行ツリーを持っています)。情報を構築する際のunionAllは、スタックオーバーフロー状況に陥る可能性があります。

キャッシングはこれを解決しませんが、途中で何らかのアクションを追加していると思います(そうしなければ、トランスフォーメーションの作成以外に何も実行されません)。キャッシングを実行すると、sparkはいくつかのステップをスキップしてスタックオーバーフローが後で単に到着するだけです。

繰り返し処理のためにRDDに戻ってもかまいません(実際の例は反復的ではありませんが、純粋に並列です。途中で別々のデータフレームを保存し、RDDに変換してRDDユニオンを使用するだけです)。

あなたのケースはtrue反復なしで多数のデータフレームを結合するように見えるので、あなたはツリーの方法で組合を行うこともできます(つまり、組合組、組組組など)。これはO (N)からO(log N)までです。ここで、Nは共用体の数です。

最後に、データフレームの読み取りと書き込みをディスクから行うことができます。アイデアは、すべてのX(たとえば20)の組合の後で、df1.write.parquet(filex)を実行してからdf1 = spark.read.parquet(filex)を実行するということです。あなたが単一のデータフレームの系譜を読むとき、それはファイル自体を読むでしょう。コストは、ファイルの書き込みと読み取りになります。

+0

私はスパークするのは初めてで、まだ非常に混乱しています。ツリーの方法で連合のコードを提供してください、そしてこのパラレル反復で寄木細工がどのように機能するのでしょうか?ありがとう@AssafMendelson –

+0

コードを手に入れることはできませんが、ユニオンを行うデータフレームのベクトル:df(i).union(df(i + 1))をループすることが考えられます。これにより、要素数の半分のベクトルが得られます。その後、1つのデータフレームを取得するまでこのプロセスを繰り返します。 –

関連する問題