データフレームをlibsvm形式で繰り返し保存する必要があります。私のコードは、コマンドall_recs_df = sqlContext.sql("select * from temp_table")
を実行している今、このデータフレームをlibsvm形式で繰り返し保存する
im_df = im_table.select("m_id", "fsz", "fnm")
all_recs_df = None
fake_df = None
firstRec = True
for eachRec in (im_df.rdd.zipWithIndex().map(lambda ((mi, fs, fn), i): (mi, fs, fn)).collect()):
m_id = eachRec[0]
fsz = eachRec[1]
fnm = eachRec[2]
volume_df = volume_table.select("id","m_id").filter(volume_table['m_id']==m_id)
m_bytes = 0
for eachVolRec in (volume_df.rdd.zipWithIndex().map(lambda ((id), i): (id)).collect()):
each_v_id = eachVolRec[0]
volume_m_id = eachVolRec[1]
vsnp_df = vsnp_table.select("v_id","ssb").filter(vsnp_table['v_id']==each_v_id)
vsnp_sum_df = vsnp_df.groupBy("v_id").agg(sum("ssb").alias("ssb_sum"))
v_bytes = vsnp_sum_df.rdd.zipWithIndex().map(lambda ((vi, vb), i): (vi, vb)).collect()[0][1]
print "\t total = %s" %(v_bytes)
m_bytes += v_bytes
print "im.fnm = %s, im.fsz = %s , total_snaphot_size_bytes: %s" %(fnm, fsz, m_bytes)
if firstRec:
firstRec = False
all_recs_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, m_bytes), label=0.0)]))
fake_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, 1000 * m_bytes), label=1.0)]))
all_recs_df = all_recs_df.unionAll(fake_df)
all_recs_df.registerTempTable("temp_table")
else:
each_rec_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, m_bytes), label=0.0)]))
all_recs_df = sqlContext.sql("select * from temp_table")
all_recs_df = all_recs_df.unionAll(each_rec_df)
all_recs_df.registerTempTable("temp_table")
のようなものであることはno such table temp_table
とコマンドall_recs_df.collect()
を実行しているが与えるエラーに'NoneType' object has no attribute 'collect'
どうやらall_recs_df
とtemp_table
はプログラム一度コンテキスト外のエラーを与えますfor
ループを終了します。
QUESTION:だから
反復的に、私はすぐにディスクにデータフレームを保存しようとしたLIBSVM形式でデータフレームを保存するために代替するものであるが、私は
MLUtils.saveAsLibSVMFile(d, "/tmp/test1")
ここでは、同じファイルにデータを追加することはできませんdはLabeledPoint RDDです。私はすぐにディスクにデータフレームを保存しようとしました