2016-10-17 14 views
0

スパークデータフレームデータをcouchbaseに書きたいと思います。スパークデータフレームをcouchbaseに保存

double[] val=new double[3]; 
SparkContext sc = new SparkContext(new SparkConf().setAppName("sql").setMaster("local").set("com.couchbase.nodes", "url_of_couchbase").set("com.couchbase.bucket.bucket_name", "password")); 
SQLContext sql = new SQLContext(sc); 
DataFrame df = sql.read().json("sample.json"); 
df.registerTempTable("sample"); 

DataFrame men=sql.sql("select mean(imp_recall_interval) from sample"); 
Row[] r=men.collect(); 
val[0]=Double.parseDouble(r[0].toString().replace("[", "").replace("]", "").trim()); 
JsonDocument doc1=JsonDocument.create("docId", JsonObject.create().put("mean", val[0])); 
System.out.println("Data Saved"); 
JsonArrayDocument jrd=JsonArrayDocument.create("imp_recall_timeinterval_mean_median_sd", JsonArray.from("more", "content", "in", "here")); 

をしかし、私はこれらをparrallelizeしようとすると、私はそれを行うことができないです - :このために、私は次のようにそれを行うにしようとしています。

sc.parrallelize(Seq(doc1,jrd)); 

このデータをcouchbaseに保存する方法を教えてください。または、私がCouchbaseでドキュメントを作成して保存する方法を指定してください。

答えて

0

これを試してみてください。

import java.util.ArrayList; 
import java.util.List; 
import com.couchbase.spark.japi.CouchbaseDocumentRDD; 
import com.couchbase.client.java.document.AbstractDocument; 


JavaSparkContext jsc = new JavaSparkContext(sc); 
SQLContext sql = new SQLContext(jsc); 

JsonDocument doc1; 
JsonArrayDocument jrd; 

List<AbstractDocument> list = new ArrayList<AbstractDocument>(); 
list.add(doc1); 
list.add(jrd); 

JavaRDD<AbstractDocument> jRDD = jsc.parallelize(list); 
CouchbaseDocumentRDD<AbstractDocument> cbRDD = CouchbaseDocumentRDD.couchbaseDocumentRDD(jRDD); 
cbRDD.saveToCouchbase(); 
+0

リストのインポットは何ですか。 –

+0

list.add();ためです。機能が動作していません。 –

+0

上記の回答に輸入品を追加しました。 – abaghel

関連する問題