0

私はtoDF TempTableにいくつかの解析を保存しようとしていますが、次のエラーを受け取ります。 "215:error:value toDFはDoubleのメンバーではありません。 私はCassandraテーブルのデータを読んでいます。私はいくつかの計算をしています。これらの結果を一時テーブルに保存します。 私はスカラーで新しいです、誰かが私を喜ばせることができますか? 私のコードスカラー:結果をtoDfの一時テーブルに保存

case class Consumo(consumo:Double, consumo_mensal: Double, mes: org.joda.time.DateTime,ano: org.joda.time.DateTime, soma_pf: Double,empo_gasto: Double); 

object Analysegridata{ 

val conf = new SparkConf(true) 

.set("spark.cassandra.connection.host","127.0.0.1").setAppName("LiniarRegression") 
.set("spark.cassandra.connection.port", "9042") 
.set("spark.driver.allowMultipleContexts", "true") 
.set("spark.streaming.receiver.writeAheadLog.enable", "true"); 
val sc = new SparkContext(conf); 

val ssc = new StreamingContext(sc, Seconds(1)) 

val sqlContext = new org.apache.spark.sql.SQLContext(sc); 
val checkpointDirectory = "/var/lib/cassandra/data" 
ssc.checkpoint(checkpointDirectory) // set checkpoint directory 

// val context = StreamingContext.getOrCreate(checkpointDirectory)  
import sqlContext.implicits._ 
JavaSparkContext.fromSparkContext(sc); 

def rddconsumo(rddData: Double): Double = { 

val rddData: Double = { 
    implicit val data = conf 
    val grid = sc.cassandraTable("smartgrids", "analyzer").as((r:Double) => (r)).collect 

def goto(cs: Array[Double]): Double = { 

    var consumo = 0.0; 
    var totaldias = 0; 
    var soma_pf = 0.0; 
    var somamc = 0.0; 
    var tempo_gasto = 0.0; 
    var consumo_mensal = 0.0; 
    var i=0 
for (i <- 0 until grid.length) {  
    val minutos = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MINUTE"); 
    val horas = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol","HOUR_OF_DAY"); 
    val dia = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "DAY_OF_MONTH"); 
    val ano = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "YEAR"); 
    val mes = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MONTH"); 
    val potencia = sc.cassandraTable("smartgrids","analyzer_temp").select("n_pf1", "n_pf2", "n_pf3") 

def convert_minutos (minuto : Int) : Double ={ 
    minuto/60 
} 
    dia.foreach (i => { 

    def adSum(potencia: Array[Double]) = { 

    var i=0; 
     while (i < potencia.length) { 
     soma_pf += potencia(i); 
     i += 1; 
     soma_pf; 
     println("Potemcia =" + soma_pf) 
    } 
} 
    def tempo(minutos: Array[Int]) = { 
     var i=0; 
     while (i < minutos.length) { 
     somamc += convert_minutos(minutos(i)) 
     i += 1; 
     somamc 
    } 
    } 

    def tempogasto(horas: Array[Int]) = { 
     var i=0; 
     while (i < horas.length) { 
     tempo_gasto = horas(i) + somamc; 
     i += 1; 
     tempo_gasto; 
     println("Temo que o aparelho esteve ligado =" + tempo_gasto) 
    } 
} 

def consumof(dia: Array[Int]) = { 
    var i=0; 
    while (i < dia.length) { 
     consumo = soma_pf * tempo_gasto; 
     i += 1; 
     consumo; 
     println("Consumo diario =" + consumo)  
    } 
    } 
}) 

mes.foreach (i => { 

def totaltempo(dia: Array[Int]) = { 
    var i = 0; 
    while(i < dia.length){ 
     totaldias += dia(i); 
     i += 1; 
     totaldias; 
     println("Numero total de dias =" + totaldias) 
    } 
} 
def consumomensal(mes: Array[Int]) = { 
    var i = 0; 
    while(i < mes.length){ 
     consumo_mensal = consumo * totaldias; 
     i += 1; 
    consumo_mensal; 
    println("Consumo Mensal =" + consumo_mensal); 
    } 
} 
}) 

} 
    consumo; 
    totaldias; 
    consumo_mensal; 
    soma_pf; 
    tempo_gasto; 
    somamc 

} 

rddData 

    } 
    rddData.toDF().registerTempTable("rddData") 
} 
     ssc.start() 
     ssc.awaitTermination() 




error: value toDF is not a member of Double" 
+0

完全なエラースタックを投稿してください!これは役に立ちます –

+0

:260:エラー:値toDFはDouble rddData.toDF()のメンバーではありませんregisterTempTable( "rddData") –

答えて

0

それは(あまりにも多くのコード、最小限例を提供してみてください)あなたが正確に何をしようとしてではなく不明だが、いくつかの明白な問題がある:

  1. は、 rddDataタイプがDoubleの場合:RDD[Double](Double値の分散コレクション)のように見えます。単一のDouble値をテーブルとして保存しようとすると意味がなく、実際には機能しません(toDFRDDで呼び出すことができますが、コンパイラは警告として、Doubleにはありません)。
  2. あなたはデータcollect():あなたは、RDDをロードするいくつかの操作を使用して、それを変換した後、テーブルとして保存したい場合は - collect()おそらくRDDに呼ばれるべきではありません。 collect()は、クラスタ全体に分散したすべてのデータを単一の「ドライバ」マシン(このコードを実行するマシン)に送ります。そのあとでクラスタを利用せず、RDDデータ構造も使用しないので、 toDFを使用してデータをDataFrameに変換します。
関連する問題