2016-09-13 7 views
0

HDFS上のファイルに格納された点集合に関連する距離行列を計算した後、分散型(CoordinateMatrix/RowMatrix)で計算された距離行列をMongoDBに格納する必要がありますMongoDB Connector for Apache Spark。これを行うための推奨された方法、またはそのような操作のためのより良いコネクタがありますか?ここでMongoDBにSpark分散行列を格納する

は私のコードの一部です:

val data = sc.textFile("hdfs://localhost:54310/usrp/copy_sample_data.txt") 
val points = data.map(s => Vectors.dense(s.split(',').map(_.toDouble))) 
val indexed = points.zipWithIndex() 
val indexedData = indexed.map{case (value, index) => (index, value)} 
val pairedSamples = indexedData.cartesian(indexedData) 
val dist = pairedSamples.map{case (x,y) => ((x,y),distance(x._2,y._2))}.map{case ((x,y),z) => (((x,y),z,covariance(z)))}  
val entries: RDD[MatrixEntry] = dist.map{case (((x,y),z,cov)) => MatrixEntry(x._1, y._1, cov)} 
val coomat: CoordinateMatrix = new CoordinateMatrix(entries)   

さらに注意することは、私はRDDからスパークでこの行列を作成しました。 RDDからMongodbにデータを保存する方が良いかもしれません。

答えて

1

CoordinateMatrixRowMatrix基本的に、それぞれRDD[MatrixEntry]RDD[Vector]ラッパーされ、両者は比較的にMongoDBに保存することができます。行列座標の場合:

val spark: SparkSession = ??? 
import spark.implicits._ 

// For 1.x 
// val sqlContext: SQLContext = ??? 
// import sqlContext.implicits._ 

val options = Map(
    "uri" -> ??? 
    "database" -> ??? 
) 

val coordMat = new CoordinateMatrix(sc.parallelize(Seq(
    MatrixEntry(1, 3, 1.4), MatrixEntry(3, 6, 2.8)) 
)) 

coordMat.entries.toDF().write 
    .options(options) 
    .option("collection", "coordinates")  
    .format("com.mongodb.spark.sql") 
    .save() 

をシェイプの文書を取得します:

簡単に元の形に戻ってキャストすることができ
{'_id': ObjectId('...'), 'i': 3, 'j': 6, 'value': 2.8} 

:かなり多くの

val entries = spark.read 
    .options(options) 
    .option("collection", "coordinates")  
    .format("com.mongodb.spark.sql") 
    .load() 
    .drop("_id") 
    .schema(...) 
    .as[MatrixEntry] 

new CoordinateMatrix(entries.rdd) 

同じことをすることができますRowMatrixのために行われますが、もう少し作業が必要です(密集配列または疎タプル(size, indices, values)としてVectorsを表す)。

どちらの場合も(CoordinateMatrixRowMatrix)、行列の形状に関する情報が失われます。

+0

ありがとうございます。私はこのエラーを受け取ります: "値toDFはorg.apache.spark.rdd.RDDのメンバーではありません[.... MatrixEntry] – EdgeRover

+0

' import spark.implicits._'ここで 'spark'は' SparkSession'オブジェクトです。 1.xは 'SQLContext'を使用します – zero323

+0

はい、私は" import org.apache.spark.sql.SQLContext._ "を追加して、メインクラスの外にcoorMatの構造を移動しましたが、問題はまだ残っています – EdgeRover

関連する問題