コンテキスト:メモリに収まらないデータセットがあります。私はKeras RNNを訓練しています。私はAWS EMRクラスタ上でPysparkを使用して、メモリに格納するのに十分小さいバッチでモデルをトレーニングしています。私はelephas
を使ってモデルを分散して実装することができず、これは私のモデルがステートフルであることに関係していると思われます。私は完全に確信していません。新しい列にSpark DataFrameをフィルタリングする
データフレームは、私がデータフレーム上の操作の数を行い、データベースを照会した後、29に0からインストールした日からの経過ごとに、ユーザおよび日の行を持っています
query = """WITH max_days_elapsed AS (
SELECT user_id,
max(days_elapsed) as max_de
FROM table
GROUP BY user_id
)
SELECT table.*
FROM table
LEFT OUTER JOIN max_days_elapsed USING (user_id)
WHERE max_de = 1
AND days_elapsed < 1"""
df = read_from_db(query) #this is just a custom function to query our database
#Create features vector column
assembler = VectorAssembler(inputCols=features_list, outputCol="features")
df_vectorized = assembler.transform(df)
#Split users into train and test and assign batch number
udf_randint = udf(lambda x: np.random.randint(0, x), IntegerType())
training_users, testing_users = df_vectorized.select("user_id").distinct().randomSplit([0.8,0.2],123)
training_users = training_users.withColumn("batch_number", udf_randint(lit(N_BATCHES)))
#Create and sort train and test dataframes
train = df_vectorized.join(training_users, ["user_id"], "inner").select(["user_id", "days_elapsed","batch_number","features", "kpi1", "kpi2", "kpi3"])
train = train.sort(["user_id", "days_elapsed"])
test = df_vectorized.join(testing_users, ["user_id"], "inner").select(["user_id","days_elapsed","features", "kpi1", "kpi2", "kpi3"])
test = test.sort(["user_id", "days_elapsed"])
私は問題を私は列車をキャッシュせずにbatch_numberでフィルタリングすることができないようです。私は私達のデータベースにある元のデータセット内にある任意の列でフィルタリングすることができますが、任意の列の上に、私はデータベースを照会した後pysparkで生成されていません:
この:train.filter(train["days_elapsed"] == 0).select("days_elapsed").distinct.show()
リターンのみ0
しかし、これらの全ては、任意のフィルタリングなし0と9の間のロット番号の全てを返す:
train.filter(train["batch_number"] == 0).select("batch_number").distinct().show()
train.filter(train.batch_number == 0).select("batch_number").distinct().show()
train.filter("batch_number = 0").select("batch_number").distinct().show()
train.filter(col("batch_number") == 0).select("batch_number").distinct().show()
また、これは動作しません:
train.createOrReplaceTempView("train_table")
batch_df = spark.sql("SELECT * FROM train_table WHERE batch_number = 1")
batch_df.select("batch_number").distinct().show()
これらの作業のすべてを私が最初にtrain.cache()を行う場合。それは絶対に必要なのですか、それともキャッシングなしでこれを行う方法はありますか?