2016-09-08 19 views
0

を読んで、私は以下のように作成したスパークデータフレームの最初の行を読み取るしようとしています:私はこのコードを実行する場合スパーク:エラーデータフレーム

#read file  
datasetDF = sqlContext.read.format('com.databricks.spark.csv').options(delimiter=';', header='true',inferschema='true').load(dataset) 
#vectorize 
ignore = ['value'] 
vecAssembler = VectorAssembler(inputCols=[x for x in datasetDF.columns if x not in ignore], outputCol="features") 
#split training - test set 
(split20DF, split80DF) = datasetDF.randomSplit([1.0, 4.0],seed) 
testSetDF = split20DF.cache() 
trainingSetDF = split80DF.cache() 

print trainingSetDF.take(5) 

しかし、私は最後の行によって、次のエラー(原因を取得します印刷trainingSetDF.take(5)):

: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 3.0 failed 4 times, most recent failure: 
Lost task 0.3 in stage 3.0 (TID 7, 192.168.101.102): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: 
org.codehaus.janino.JaninoRuntimeException: 
Code of method " 
(Lorg/apache/spark/sql/catalyst/InternalRow;Lorg/apache/spark/sql/catalyst/InternalRow;)I" of class 
**"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificOrdering" grows beyond 64 KB** 

私は機能(256以上)のかなり多くを持っている場合にのみ、この現象が発生することを追加する必要があります。 私は何が間違っていますか?あなたはデータを結合するために使用できるIDの変数を持っている場合は

おかげで、 フロラン

+0

はhttps://issues.apache.org/jira/browse/SPARK-16845と似ています –

答えて

0

は私が全体のデータのrandomSplit()エラーの回避策を見つけました(そうしない場合、あなたは簡単に作ることができます1つは分割前であり、まだ回避策を使用しています)。分割された部分に別々の変数名をつけて、同じものを使用しないようにしてください(私はtrain1/valid1を使いました)。そうでなければ、同じRDDへのポインタを作成するだけで同じエラーになります。これはおそらく私が今まで見た中で最も愚かなバグの1つです。私たちのデータはそれほど広いものではありませんでした。

Y   = 'y' 
ID_VAR  = 'ID' 
DROPS  = [ID_VAR] 

train = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('s3n://emr-related-files/train.csv') 
test = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('s3n://emr-related-files/test.csv') 
train.show() 
print(train.count) 

(train1,valid1) = train.select(ID_VAR).randomSplit([0.7,0.3], seed=123) 
train = train1.join(train,ID_VAR,'inner') 
valid = valid1.join(train, ID_VAR,'inner') 

train.show()