2017-09-18 12 views
0

私はPandasのデータフレームにデータを繰り返しロードし、外部の関数(例:xgboost、例のコードには表示されていません)を使って何らかの処理を行い、結果を単一のPySparkオブジェクト(RDDまたはDF)にプッシュします。ディスクに流出する代わりにOOMエラーを出すパンダとPySpark

私は、ソースがPandas DataFrameの場合、RDDまたはDataframeとしてデータを格納するときにPySparkをディスクに流し込みようとしました。何も動作していないようですが、Javaドライバをクラッシュさせてしまい、データをロードできません。また、基本的なtextFile RDDを使用せずにデータをロードしようとしましたが、魅力的でした。私はこれがPySparkのバグかどうか、あるいは回避策があるかどうか疑問に思っています。

サンプルコード:RDDコードに直接負荷作業

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space

:との数回の反復後の

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 
import pyspark 

try: 
    SparkContext.stop(sc) 
except NameError: 
    1 

SparkContext.setSystemProperty('spark.executor.memory', '200g') 
SparkContext.setSystemProperty('spark.driver.memory', '200g') 
sc = SparkContext("local", "App Name") 
sql_sc = SQLContext(sc) 

chunk_100k = pd.read_csv("CData.csv", chunksize=100000) 
empty_df = pd.read_csv("CData.csv", nrows=0) 
infer_df = pd.read_csv("CData.csv", nrows=10).fillna('') 
my_schema = SQLContext.createDataFrame(sql_sc, infer_df).schema 

SparkDF = SQLContext.createDataFrame(sql_sc, empty_df, schema=my_schema) 

for chunk in chunk_100k: 
    SparkDF = SparkDF.union(SQLContext.createDataFrame(sql_sc, chunk, schema=my_schema)) 

クラッシュ

my_rdd = sc.textFile("CData.csv") \ 
.map(lambda line: line.split(",")) \ 
.filter(lambda line: len(line)>1) \ 
.map(lambda line: (line[0],line[1])) 

更新:

私が持っています変更されたth eコードを使用して、RDDの代わりにSpark DataFramesにロードする際にエラーが発生しても、問題は引き続き発生し、エラーメッセージは引き続きRDDを参照していることに注意してください。サンプルコードを変更する前 、以下の理由で「並列化」を使用する場合RDDSへの保存は、少なくとも問題であることが見出された:

Why does SparkContext.parallelize use memory of the driver?

+0

は問題から明確にします。 –

+1

それでも__exactly同じproblem__( 'SparkSession.createDataFrame' - >' SparkSession._createFromLocal' - > 'SparkContext.parallelize')と同じ理由で失敗します。ローカルオブジェクトから分散データ構造を作成するだけでは、方法はありません。スケーラブルな方法でデータをロードする場合は、Spark csv readerを使用します。 – zero323

+0

スパークcsvリーダーを使用する以外の方法はありますか?私はcsvではなく、ディスクに流出しながらパンダから読みたい。ファイルをpandasからディスクに書き込んだり、Sparkに再ロードすることは別のステップです。 –

答えて

-1

apache-におけるファイルの火花defaults.confにファイルを作成しますスパーク/ 1.5.1/libexecに/ confに/それに次の行を追加します。並列使用しているとき、私はコンテキストに変更したzero323 @ spark.driver.memory 45G spark.driver.maxResultSize 10G

関連する問題