2017-01-30 8 views
1

コンテキスト:メモリに収まらないデータセットがあります。私はKeras RNNを訓練しています。私はAWS EMRクラスタ上でPysparkを使用して、メモリに格納するのに十分小さいバッチでモデルをトレーニングしています。私はelephasを使ってモデルを分散して実装することができず、これは私のモデルがステートフルであることに関係していると思われます。私は完全に確信していません。新しい列にSpark DataFrameをフィルタリングする

データフレームは、私がデータフレーム上の操作の数を行い、データベースを照会した後、29に0からインストールした日からの経過ごとに、ユーザおよび日の行を持っています

query = """WITH max_days_elapsed AS (
     SELECT user_id, 
      max(days_elapsed) as max_de 
     FROM table 
     GROUP BY user_id 
     ) 
     SELECT table.* 
     FROM table 
      LEFT OUTER JOIN max_days_elapsed USING (user_id) 
     WHERE max_de = 1 
      AND days_elapsed < 1""" 

df = read_from_db(query) #this is just a custom function to query our database 

#Create features vector column 
assembler = VectorAssembler(inputCols=features_list, outputCol="features") 
df_vectorized = assembler.transform(df) 

#Split users into train and test and assign batch number 
udf_randint = udf(lambda x: np.random.randint(0, x), IntegerType()) 
training_users, testing_users = df_vectorized.select("user_id").distinct().randomSplit([0.8,0.2],123) 
training_users = training_users.withColumn("batch_number", udf_randint(lit(N_BATCHES))) 

#Create and sort train and test dataframes 
train = df_vectorized.join(training_users, ["user_id"], "inner").select(["user_id", "days_elapsed","batch_number","features", "kpi1", "kpi2", "kpi3"]) 
train = train.sort(["user_id", "days_elapsed"]) 
test = df_vectorized.join(testing_users, ["user_id"], "inner").select(["user_id","days_elapsed","features", "kpi1", "kpi2", "kpi3"]) 
test = test.sort(["user_id", "days_elapsed"]) 

私は問題を私は列車をキャッシュせずにbatch_numberでフィルタリングすることができないようです。私は私達のデータベースにある元のデータセット内にある任意の列でフィルタリングすることができますが、任意の列の上に、私はデータベースを照会した後pysparkで生成されていません:

この:train.filter(train["days_elapsed"] == 0).select("days_elapsed").distinct.show()リターンのみ0

しかし、これらの全ては、任意のフィルタリングなし0と9の間のロット番号の全てを返す:

  • train.filter(train["batch_number"] == 0).select("batch_number").distinct().show()
  • train.filter(train.batch_number == 0).select("batch_number").distinct().show()
  • train.filter("batch_number = 0").select("batch_number").distinct().show()
  • train.filter(col("batch_number") == 0).select("batch_number").distinct().show()

また、これは動作しません:

train.createOrReplaceTempView("train_table") 
batch_df = spark.sql("SELECT * FROM train_table WHERE batch_number = 1") 
batch_df.select("batch_number").distinct().show() 

これらの作業のすべてを私が最初にtrain.cache()を行う場合。それは絶対に必要なのですか、それともキャッシングなしでこれを行う方法はありますか?

答えて

3

スパーク> = 2.3(? - SPARK-22629の進捗に応じて)

asNondeterministic方法を使用して特定の最適化を無効にすることが可能であるべきです。

スパーク2.3

<は、乱数を生成するためにUDFを使用しないでください。まず、引用するにはthe docs

ユーザー定義関数は確定的でなければなりません。最適化のために、重複呼び出しが除去されるか、関数がクエリ内に存在するよりも多く呼び出されることさえあります。

たとえUDF用ではなかったとしても、単一のレコードを処理するときに、この権利を実装することはほとんど不可能なSparkの微妙な要素があります。

スパーク既にrand提供:

はU [0.0、1.0]から独立同一分布(i.i.d.)サンプルのランダム列を生成します。

randn

標準正規分布から独立同一分布(i.i.d.)サンプルとカラムを生成します。

これは、より複雑なジェネレータ機能を構築するために使用できます。

注::コードには他にも問題がいくつかある可能性がありますが、最初から受け入れられません。

関連する問題