2016-10-17 4 views
2

MongoDBデータベースで集約クエリを実行し、結果を書き戻すApache Sparkアプリケーションを試してみたいと思います。私は問題のJava版を試すことができましたが、今はRStudioを使用してR言語に移植する必要があります。私はSparkRドキュメントを、次の試してみましたMongoDBとRStudioを使用したSparkR 2.xアプリケーション

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"))) 

##PROBLEM - Is this correct way of setting configuration? 
sparkConfig <- list("spark.driver.memory"="1g","spark.mongodb.input.uri"="mongodb://username:[email protected]:27017/price_subset?authSource=admin","spark.mongodb.output.uri"="mongodb://username:[email protected]:27017/price_subset_output?authSource=admin") 

customSparkPackages <- c("org.mongodb.spark:mongo-spark1-connector_2.11:1.0.0"); 

##Starting Up: SparkSession 
##PROBLEM-1 Is this correct way of initializing spark session ? 
sparkSession <- sparkR.session(appName="MongoSparkConnectorTour",master = "local[*]",enableHiveSupport = FALSE,sparkConfig = sparkConfig,sparkPackages = customSparkPackages) 


##PROBLEM-2 - This complains about being deprecated. How to fix this ? 
sqlContext <- sparkRSQL.init(sparkSession) 

## Save some data 
charactersRdf <- data.frame(list(name=c("Bilbo Baggins", "Gandalf", "Thorin", "Balin", "Kili", "Dwalin", "Oin", "Gloin", "Fili", "Bombur"), 
           age=c(50, 1000, 195, 178, 77, 169, 167, 158, 82, NA))) 

charactersSparkdf <- createDataFrame(sqlContext, charactersRdf) 
#PROBLEM-3 This throws an error - Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
# java.lang.NoClassDefFoundError: com/mongodb/ConnectionString 
write.df(charactersSparkdf, "", source = "com.mongodb.spark.sql.DefaultSource", mode = "overwrite") 

- :私はワークアウトにしようとしています

public static void main(String args[]) { 

SparkConf sparkConf = new SparkConf(true) 
     .setMaster("local[*]") 
     .setSparkHome(SPARK_HOME) 
     .setAppName("SparklingMongoApp") 
     .set("spark.ui.enabled", "false") 
     .set("spark.app.id", APP) 
     .set("spark.mongodb.input.uri", "mongodb://admin:[email protected]:27017/input_collection") 
     .set("spark.mongodb.output.uri", "mongodb://admin:[email protected]:27017/output_collection"); 


JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); 
JavaMongoRDD<Document> javaMongoRDD = MongoSpark.load(javaSparkContext); 

Dataset<Row> dataset = javaMongoRDD.toDF(); 

dataset.createOrReplaceTempView(TEMP_VIEW); 

// a valid spark sql QUERY 
Dataset<Row> computedDataSet = dataset.sqlContext().sql(QUERY); 
MongoSpark.save(computedDataSet); 
javaSparkContext.close(); 

}

等価R/RStudio版 - :を働いたこと

Javaバージョン実行中の例ではまだ運動できませんでした。

期待: -

  1. RStudioで火花セッションを初期化する正しい方法は何ですか。 MongoDB official sampleはSparkShell(私のマシンではハングアップする)としてのみ機能し、廃止予定です。 RStudioで実行できるコードスニペットが必要です。

  2. java.lang.NoClassDefFoundErrorを修正する方法。

SparkR 2.x + MongoDB 3.xコードのワーキングピースのサンプル/リファレンスは高く評価されます。

バージョン: - Apacheのスパーク - 2.0.1 ジャワ - 1.8 のMongoDB - 3 R - 最新

+1

あなたはRStudioで作業している場合は、あなたも自分のSparklyRパッケージ試みを与えるかもしれない(それはとの統合を持っていますRStudioのプレビューリリースのIDE)。 http://spark.rstudio.com/ – Jonathan

答えて

2

が最後にそれが働いてしまいました。 MongoDBのドキュメントにはSpark 1.6の例があり、Spark 2.0.1を実行していたことが分かります。

とにかく、これはRStudioを使用して私のために働いていたものです: -

## Make sure you have SPARK_HOME environment variable set to your spark home director. 
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"))) 

spark <- sparkR.session(master="local[*]", appName = "mongoSparkR",enableHiveSupport = FALSE,sparkPackages = c("org.mongodb.spark:mongo-spark-connector_2.11:2.0.0-rc0"),sparkConfig = list(spark.mongodb.input.uri="mongodb://username:[email protected]:27017/database.collection_name?authSource=admin",spark.mongodb.output.uri="mongodb://username:[email protected]:27017/database.collection_name_output?authSource=admin")) 

pricing_df <- read.df(source = "com.mongodb.spark.sql.DefaultSource",x=10000) 
head(pricing_df) 
createOrReplaceTempView(pricing_df,"T_YOUR_TABLE") 

## Obviously this is just a dummy SQL, replace with it yours. 
result_df <- sql("SELECT year(price) as YEAR, month(price) as MONTH , SUM(midPrice) as SUM_PRICING_DATA FROM T_YOUR_TABLE GROUP BY year(price),month(price) ORDER BY year(price),month(price)") 


## stop instance when done. 
sparkR.stop() 

はあなたSPARK_HOME/jarsフォルダに依存するjarを持っていることを確認してください。私はこの作業を取得するために置かれ

エキストラ瓶(バージョンは時間をかけて進化するかもしれません): -

org.mongodb.spark_mongo-spark-connector_2.11-2.0.0-rc0.jar 

org.mongodb_mongo-java-driver-3.2.2.jar 
関連する問題