2017-01-30 10 views
0

既存のデータフレーム行のhashModの値を持つデータフレームを作成する列を追加します。以下の例では、1つの特定の列「データ」のハッシュに対して同じ結果を得ることができますが、データフレーム行全体(allcolumns)に対して同じ結果を得るにはどうすればよいでしょうか?既存のデータフレーム行のhashModを値に持つデータフレームを作成する列を追加します。

object Container { 
    case class intContainer(data: Int) 
} 


val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._  
val getBucket = udf((data: Object) => data.hashCode() %10) 
val schema = StructType(List(StructField("age", IntegerType))) 

val userList = List((23),(24), (25), (57)) 
val df1:RDD[Container.intContainer] = sc.parallelize(userList).map(x=> Container.intContainer(x)) 
val df = df1.toDF() 
df.registerTempTable("dfcount") 
val countdf = sqlContext.sql("select data,data+1 as count, current_timestamp() as time from dfcount") 
val xx = countdf.withColumn("bucket_id", getBucket(col("data"))) 
+0

dfには可能な列の数がいくつありますか? – mrsrinivas

答えて

1

以下のスニペットでは、バケット値を得るためにハッシュコードが合計された列の配列をとるudfを採用しています。この関数は、任意の数の列および任意のスキーマに対して機能します。

import sqlContext.implicits._ 
import org.apache.spark.sql.functions.udf 
def hasher(data: AnyRef *) = (data.map(_.hashCode).sum % 10) 
val getBucket = udf(hasher _) 
val df = sc.parallelize(('a' to 'z').map(_.toString) zip (1 to 30)).toDF("c1","c2") 
df.withColumn("bucket", getBucket(array(df.columns.map(df.apply): _*))).show() 
+---+---+------+ 
| c1| c2|bucket| 
+---+---+------+ 
| a| 1|  6| 
| b| 2|  8| 
| c| 3|  0| 
| d| 4|  2| 
| e| 5|  4| 
| f| 6|  6| 
| g| 7|  8| 
| h| 8|  0| 
| i| 9|  2| 
| j| 10|  3| 
| k| 11|  5| 
| l| 12|  7| 
| m| 13|  9| 
| n| 14|  1| 
| o| 15|  3| 
| p| 16|  5| 
| q| 17|  7| 
| r| 18|  9| 
| s| 19|  1| 
| t| 20|  4| 
+---+---+------+ 
関連する問題