0

データフレームをlibsvm形式で繰り返し保存する必要があります。私のコードは、コマンドall_recs_df = sqlContext.sql("select * from temp_table")を実行している今、このデータフレームをlibsvm形式で繰り返し保存する

im_df = im_table.select("m_id", "fsz", "fnm") 
all_recs_df = None 
fake_df = None 
firstRec = True 
for eachRec in (im_df.rdd.zipWithIndex().map(lambda ((mi, fs, fn), i): (mi, fs, fn)).collect()): 
    m_id = eachRec[0] 
    fsz = eachRec[1] 
    fnm = eachRec[2] 

    volume_df = volume_table.select("id","m_id").filter(volume_table['m_id']==m_id) 
    m_bytes = 0 
    for eachVolRec in (volume_df.rdd.zipWithIndex().map(lambda ((id), i): (id)).collect()): 
     each_v_id = eachVolRec[0] 
     volume_m_id = eachVolRec[1] 
     vsnp_df = vsnp_table.select("v_id","ssb").filter(vsnp_table['v_id']==each_v_id) 
     vsnp_sum_df = vsnp_df.groupBy("v_id").agg(sum("ssb").alias("ssb_sum")) 
     v_bytes = vsnp_sum_df.rdd.zipWithIndex().map(lambda ((vi, vb), i): (vi, vb)).collect()[0][1] 
     print "\t total = %s" %(v_bytes) 
     m_bytes += v_bytes 

    print "im.fnm = %s, im.fsz = %s , total_snaphot_size_bytes: %s" %(fnm, fsz, m_bytes) 
    if firstRec: 
     firstRec = False 
     all_recs_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, m_bytes), label=0.0)])) 
     fake_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, 1000 * m_bytes), label=1.0)])) 
     all_recs_df = all_recs_df.unionAll(fake_df) 
     all_recs_df.registerTempTable("temp_table") 
    else: 
     each_rec_df = sqlContext.createDataFrame(sc.parallelize([Row(features=Vectors.dense(fsz, m_bytes), label=0.0)])) 
     all_recs_df = sqlContext.sql("select * from temp_table") 
     all_recs_df = all_recs_df.unionAll(each_rec_df) 
     all_recs_df.registerTempTable("temp_table") 

のようなものであることはno such table temp_table

とコマンドall_recs_df.collect()を実行しているが与えるエラーに'NoneType' object has no attribute 'collect'

どうやらall_recs_dftemp_tableはプログラム一度コンテキスト外のエラーを与えますforループを終了します。

QUESTION:だから

反復的に、私はすぐにディスクにデータフレームを保存しようとしたLIBSVM形式でデータフレームを保存するために代替するものであるが、私は

MLUtils.saveAsLibSVMFile(d, "/tmp/test1") 

ここでは、同じファイルにデータを追加することはできませんdはLabeledPoint RDDです。私はすぐにディスクにデータフレームを保存しようとしました

答えて

1

既存LIBSVM形式のファイルにデータを追加する方法はありますが、私はにデータを追加することはできません。forループ内で上記のコマンドを実行すると、Output directory file:/tmp/test1 already exists

QUESTIONを与えます同じファイル

MLUtils.saveAsLibSVMFile(D、 "を/ tmp/TEST1")

は、ここでDはLabeledPoint RDDです。ループの中で上記のコマンドを実行すると、出力ディレクトリのファイルを与える:を/ tmp/test1のは、すでに

の質問に存在する既存のLIBSVM形式のファイルにデータを追加する方法はあり

あなたが保存して、あなたのファイルを上書きすることができますhereですが、MLUtils.saveAsLibSVMFile()では処理されません。

MLUtils.saveAsLibSVMFile()では、既存のファイルを上書きすることはできません。

したがって、次のコードは既存のlibsvm形式ファイルにデータを追加しませんが、サイクルごとに取得したデータと以前のサイクルで取得したデータを結合するループです。

from pyspark.mllib.regression import LabeledPoint 
from pyspark.mllib.util import MLUtils 

yourRDD = sc.emptyRDD() # start with an empty RDD 

for elem in xrange(0,3): # your loop 
    rdd_aux = sc.parallelize([LabeledPoint(elem,[elem*2,elem*3])]) #just an example 
    #store and overwrite your new data in an auxiliary RDD at every cycle 
    yourRDD = yourRDD.union(rdd_aux) # combine your RDD_aux with the RDD that you want to make longer at every cycle 

#yourRDD.take(3) 
#[LabeledPoint(0.0, [0.0,0.0]), LabeledPoint(1.0, [2.0,3.0]), LabeledPoint(2.0, [4.0,6.0])] 

MLUtils.saveAsLibSVMFile(yourRDD,"/your/path") 

このように、あなたの前のRDDに新しいRDDを追加して、単一のファイルを保存するのではなく、既存のファイルに新しいデータを追加することができます:1つのファイルを保存します。

関連する問題