2015-10-01 8 views
5

私はSpark、SparkR、そして一般にすべてのHDFS関連の技術に慣れています。createDataFrameのSparkRボトルネック?

Sys.setenv(SPARK_HOME="/private/tmp/spark-1.5.0-bin-hadoop2.6") 
.libPaths("/private/tmp/spark-1.5.0-bin-hadoop2.6/R/lib") 
require('SparkR') 
require('data.table') 

sc <- sparkR.init(master="local") 
sqlContext <- sparkRSQL.init(sc) 
hiveContext <- sparkRHive.init(sc) 

n = 1000 
x = data.table(id = 1:n, val = rnorm(n)) 

Sys.time() 
xs <- createDataFrame(sqlContext, x) 
Sys.time() 

コードがすぐに実行します。私は1.5.0をスパークとSparkRでいくつかの簡単なコードを実行し、最近インストールしました。しかし、それをn = 1000000に変更すると、約4分かかります(2回の間にはSys.time()が呼び出されます)。ポート4040のコンソールでこれらのジョブをチェックすると、n = 1000のジョブの継続時間は0.2秒、ジョブはn = 1000000の0.3秒です。私は何か間違っているのですか?

+0

途中で予期せぬ問題が発生しましたが(私がデータフレーム列として置くことができる奇妙なことを忘れていることは言及していませんが)、これは1.6.0で解決する必要があります:[SPARK-11086](https://issues.apache.org/jira/browse/SPARK-11086) – zero323

答えて

4

あなたは特に間違ったことはしていません。

  1. createDataFrame現在実装されているように(スパーク1.5.1)は遅いです。これは、SPARK-8277に記載されている既知の問題です。
  2. 現在の実装はdata.tableでうまくいきません。
  3. ベースRは比較的遅い。スマートな人は、バグではなく機能だと言いますが、それはまだ検討するべきことです。昔ながらのdata.frame代わりdata.table

    • 使用を:SPARK-8277が解決された

までずっとあなたが行うことができますが、そこに二つのオプションは、あなたが試すことができない存在です。ディスクにdata.table

dt <- data.table::fread("flights.csv") 
microbenchmark::microbenchmark(createDataFrame(sqlContext, dt), times=3) 

## Unit: seconds   
##        expr  min  lq  mean median 
## createDataFrame(sqlContext, dt) 378.8534 379.4482 381.2061 380.043 
##  uq  max neval 
## 382.3825 384.722  3 
  • 書き込みに比べ

    df <- read.csv("flights.csv") 
    microbenchmark::microbenchmark(createDataFrame(sqlContext, df), times=3) 
    
    ## Unit: seconds 
    ##        expr  min  lq  mean median 
    ## createDataFrame(sqlContext, df) 96.41565 97.19515 99.08441 97.97465 
    ##  uq  max neval 
    ## 100.4188 102.8629  3 
    

    と狂気のようR.との直接相互作用することなく、データフレームをスパークするために直接データをロードするためにspark-csvを使用します。フライトデータセット(227496行、14列)を使用して、それは音として:

    dt <- data.table::fread("flights.csv") 
    
    write_and_read <- function() { 
        write.csv(dt, tempfile(), row.names=FALSE) 
        read.df(sqlContext, "flights.csv", 
         source = "com.databricks.spark.csv", 
         header = "true", 
         inferSchema = "true" 
        ) 
    } 
    
    ## Unit: seconds 
    ##    expr  min  lq  mean median 
    ## write_and_read() 2.924142 2.959085 2.983008 2.994027 
    ##  uq  max neval 
    ## 3.01244 3.030854  3 
    
  • 私は再ておりません最初にRで扱うことができるデータをSparkにプッシュするのは理にかなっていますが、それには興味はありません。

    編集

    この問題はスパーク1.6.0にSPARK-11086によって解決されなければなりません。

    +0

    あなたの最後のコメントが好きです! :) – eliasah

    +0

    ええと...私はここに偏っているかもしれませんが、SparkRがRからの楽しみの大部分を取り、非常にリターンを与えるような気がします。 Scala APIは、比較的小さなデータでも、特にBreezeを超えて慣用的な代替手段がない場合でも、十分に検討する価値があります。 Pythonでは50-50です。しかし、SparkRは少し不器用なデータベースドライバのように感じています:) – zero323

    +0

    なぜ2点目を詳述できますか? data.tableはdata.frameであり、data.frameに似たカラムにアクセスする方法があります。それで少し戸惑う。また、あなたの第3のポイントでは、何が比較的遅い?この文脈でどのような操作をしていますか? – Arun