2017-06-03 8 views
1

残念ながら、クライアントによって非常に乱雑で非常に大きなテーブル(csv)が与えられました。これは、ワイドフォーマットである: '(Cassandraでワイドテーブルをロングフォーマットに変換する

例として、列は次のとおりです。

Name, Date, Usage_Hr1, Usage_Hr2, ..., Usage_Hr24, ... lots more columns 

私は通常ちょうどRに.CSVをロードし、tidyrパッケージからgatherを使用しますが、データが大きすぎるでしょう。私はCOPYがカサンドラに私のテーブルを編したら(名前と日付にPRIMARY KEYを設定し)、だから私の質問がある

私はsparklyrにデータをロードすると考えられてきましたが、何のgather機能がsparklyrにありませんまだ... 、ホwこれらのcolを長い形式に変換できますか?私はちょうど幸運の外でしょうか?私はDB男btwではないので、私はここでは考えていない。

ノート私はカッサンドラの最新バージョンを使用していますが、私の現在のテーブルは約1,000万行です。

+0

1000万行と列数 – Uwe

+0

46個の列がデータセット内にあります。理想的には、顧客情報のためだけにテーブルを作成し、その後は別のテーブルを使用することができます – Zafar

答えて

1

スパークでexplode機能を使用すると、but compared to support APIs、これを行うとsparklyrが関係します。

初期化及びデータ例:

library(stringi) 

sc <- spark_connect("local[*]") 
df <- data.frame(A = c("a", "b", "c"), B = c(1, 3, 5), C = c(2, 4, 6)) 
sdf <- copy_to(sc, df, overwrite =TRUE) 

ヘルパー機能:

#' Given name, return corresponding SQL function 
sqlf <- function(f) function(x, ...) { 
    invoke_static(sc, "org.apache.spark.sql.functions", f, x, ...) 
} 

メルト機能:

#' @param df tbl_spark 
#' @param sc spark_connection 
#' @param id_vars id columns 
#' 
melt <- function(df, sc, id_vars, value_vars = NULL, 
    var_name = "key", value_name = "value") { 
    # Alias for the output view 
    alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep = "_") 
    # Get session and JVM object 
    spark <- spark_session(sc) 
    jdf <- spark_dataframe(df) 

    # Convert characters to JVM Columns 
    j_id_vars <- lapply(id_vars, sqlf("col")) 

    # Combine columns into array<struct<key,value>> and explode 
    exploded <- sqlf("explode")(sqlf("array")(lapply(value_vars, function(x) { 
    key <- sqlf("lit")(x) %>% invoke("alias", var_name) 
    value <- sqlf("col")(x) %>% invoke("alias", value_name) 
    sqlf("struct")(list(key, value)) 
    }))) 

    # expand struct<..., struct<key, value>> into struct<..., key, value> 
    exprs <- lapply(
    c(id_vars, paste("col", c(var_name, value_name), sep = ".")), 
    sqlf("col")) 

    # Explode and register as temp table 
    jdf %>% 
    invoke("withColumn", "col", exploded) %>% 
    invoke("select", exprs) %>% 
    invoke("createOrReplaceTempView", alias) 

    dplyr::tbl(sc, alias) 
} 

使用例:

melt(sdf, sc, "A", c("B", "C")) 

## Source: query [6 x 3] 
## Database: spark connection master=local[*] app=sparklyr local=TRUE 
## 
## # A tibble: 6 x 3 
##  A key value 
## <chr> <chr> <dbl> 
## 1  a  B  1 
## 2  a  C  2 
## 3  b  B  3 
## 4  b  C  4 
## 5  c  B  5 
## 6  c  C  6 
関連する問題