2017-02-13 15 views
1

私は、 "id"、 "timestamp"、 "action"、 "value"、 "location"という列のテーブルを含むCSV形式のファイルを持っています。 は、私は、テーブルの各列に関数を適用すると、次のように私はすでにRのコードを書いた:SparkRの各行に関数を適用する方法は?

user <- read.csv(file_path,sep = ";") 
num <- nrow(user) 
curLocation <- "1" 
for(i in 1:num) { 
    row <- user[i,] 
    if(user$action != "power") 
     curLocation <- row$value 
    user[i,"location"] <- curLocation 
} 

Rスクリプトが正常に動作し、今私はSparkRそれを適用したいです。しかし、私はSparkRのi番目の行に直接アクセスすることができず、SparkR documentationのすべての行を操作する関数を見つけることができませんでした。

Rスクリプトと同じ効果を得るにはどの方法を使用しますか?また

は、@chateaurによってアドバイスとして、私は次のようにdapply関数を使用してコードすることを試みた:

curLocation <- "1" 
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string")) 
setLocation <- function(row, curLoc) { 
    if(row$Action != "power|battery|level"){ 
     curLoc <- row$Value 
    } 
    row$Location <- curLoc 
} 
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema) 
head(bw) 

それからエラーを得た: error message

I警告メッセージ見上げ条件が長さ> 1で、最初の要素のみが使用されますと私は何かを見つけたhttps://stackoverflow.com/a/29969702/4942713。それは、dapply機能パラメータは私のデータフレームの代わりに、1つの行パーティション全体を表しているかどうか、私は疑問に思う作ったのですか?たぶんdapply機能は望ましい解決策ではないでしょうか?

後で、@chateaurの指示に従って機能を変更しようとしました。 dapplyを使用する代わりに、dapplyCollectを使用して、スキーマを指定する労力を節約できました。できます!

changeLocation <- function(partitionnedDf) { 
    nrows <- nrow(partitionnedDf) 
    curLocation <- "1" 
    for(i in 1:nrows){ 
     row <- partitionnedDf[i,] 
     if(row$action != "power") { 
      curLocation <- row$value 
     } 
    partitionnedDf[i,"location"] <- curLocation 
    } 
    partitionnedDf 
} 

bw <- dapplyCollect(user, changeLocation) 
+0

あなたはdplyrよりsparklyr(同じ構文を使用することができます) –

+0

@DimitriPetrenko SparkRを使用する必要がある場合はどうすればよいですか? SparkRはその効果を達成できますか? – Scorpion775

答えて

2

Scorpion775、

あなたsparkRコードを共有する必要があります。 RとsparkRでデータが同じように操作されないことを忘れないでください。

http://spark.apache.org/docs/latest/sparkr.html

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA") 

その後、あなたはここにdapply機能を見ることができます:https://spark.apache.org/docs/2.1.0/api/R/dapply.htmlここ

は実施例である:

changeLocation <- function(partitionnedDf) { 
    nrows <- nrow(partitionnedDf) 
    curLocation <- as.integer(1) 

    # Loop over each row of the partitionned data frame 
    for(i in 1:nrows){ 
     row <- partitionnedDf[i,] 

     if(row[1] != "power") { 
      curLocation <- row[2] 
     } 
     partitionnedDf[i,3] <- curLocation 
    } 

    # Return modified data frame 
    partitionnedDf 
} 

# Load data 
df <- read.df("data.csv", "csv", header="false", inferSchema = "true") 

head(collect(df)) 

# Define schema of dataframe 
schema <- structType(structField("action", "string"), structField("value", "integer"), 
        structField("location", "integer")) 

# Change location of each row      
df2 <- dapply(df, changeLocation, schema) 

head(df2) 
+0

私はdapply関数を見て、 "SparkDataFrameの各パーティションに**関数**を適用する"のに使われていることを知りました。私の理解から、_partition_という概念は_row_とは関係がありません。私の関心事は、SparkDataFrameに**機能**を書き込む方法を知らないからです。現在私は**機能を実装する方法しか知りません**私はSparkRではなくRにしたいです。アドバイスをいただけますか? – Scorpion775

+0

私は火花のエキスパートではありませんが、パーティションはクラスタ上に広がるように分割されていると思います。上記の例を試して、それがあなたの必要性に合っているかどうか教えてください。 – chateaur

+0

アドバイスをいただきありがとうございます。私はあなたの指示に従おうとしましたが、質問に示されているようにエラーがありました。 – Scorpion775

関連する問題