2017-03-01 3 views
1

私はこのコードを実行しました。私の質問は関数キャストのデータ型です。どのようにデータセットに含まれるすべての列を同じタイムスタンプのexeceptでキャストできますか?列タイムスタンプ以外のすべての列に関数avgを適用する。 どうもありがとうすべての列に関数を適用するspark

val df = spark.read.option("header",true).option("inferSchema", "true").csv("C:/Users/mhattabi/Desktop/dataTest.csv") 
val result=df.withColumn("new_time",((unix_timestamp(col("time")) /300).cast("long") * 300).cast("timestamp")) 
result("value").cast("float")//here the first question 
val finalresult=result.groupBy("new_time").agg(avg("value")).sort("new_time")//here the second question about avg 
finalresult.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("C:/mydata.csv") 
+0

キャストするすべての列に 'withColumn'を追加できませんか? 'agg'の中にある' 'avg''と同じくらい多くの' 'avg''は? – Mariusz

+0

@マリウスズデータセットが非常に大きく、多くの列があるという手続きは、列の時間以外のすべての列を取った – user7394882

答えて

0

これはpysparkで実装するのは非常に簡単ですが、私はScalaのコードにこれを書き換えしようとしているtoubleに実行...私はあなたが何とかそれを管理することを願っています。

from pyspark.sql.functions import * 
df = spark.createDataFrame([(100, "4.5", "5.6")], ["new_time", "col1", "col2"]) 
columns = [col(c).cast('float') if c != 'new_time' else col(c) for c in df.columns] 
aggs = [avg(c) for c in df.columns if c != 'new_time'] 
finalresult = df.select(columns).groupBy('new_time').agg(*aggs) 
finalresult.explain() 

*HashAggregate(keys=[new_time#0L], functions=[avg(cast(col1#14 as double)), avg(cast(col2#15 as double))]) 
+- Exchange hashpartitioning(new_time#0L, 200) 
    +- *HashAggregate(keys=[new_time#0L], functions=[partial_avg(cast(col1#14 as double)), partial_avg(cast(col2#15 as double))]) 
     +- *Project [new_time#0L, cast(col1#1 as float) AS col1#14, cast(col2#2 as float) AS col2#15] 
     +- Scan ExistingRDD[new_time#0L,col1#1,col2#2] 
関連する問題