2016-05-05 6 views
0

私はフォーマット以下のフラット・ファイルにMahoutののrowsimilarityを実行するために管理している:実行MahoutのRowSimilarity推薦

項目-ID TAG1タグ-2 TAG3

これは経由を実行する必要がありますcliと出力は再びフラットファイルです。私はMongoDBからデータを読み込み(他のDBも利用できるようにします)、DBに出力をダンプし、その後私たちのシステムから選ぶことができるようにしたいと思います。

私はここ数日のために研究し、物事の下に見つけた:

  • は、データ
  • を処理するためにIndexedDataSetオブジェクトはに出力を変換し、それを渡しRowSimilarity
  • を実装Scalaのコードを記述する必要があります必要な形式(json/csv)

私がまだ理解していないのは、DBからIndexedDataSetにデータをインポートする方法です。また、私はRDD形式について読んだことがありますが、RowSimilarityコードで使用できるRDDにjsonデータを変換する方法をまだ理解できません。

tl; dr:MongoDBデータをmahout/spark行の類似度で処理できるように変換する方法はありますか。

EDIT1:私は、このリンクからRDDにMongodataを変換いくつかのコードを発見した:https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage#scala-example

今私はそれがSimilarityAnalysis.rowSimilarityIDSに渡すことができるようIndexedDatasetに変換するのに役立つ必要があります。

TL; DR:どのように私はIndexedDatasetにRDDを変換しない

以下
+0

あなたの質問がありますか? – eliasah

+0

"tl; dr" – user3295878

答えて

0

が答えです:

import org.apache.hadoop.conf.Configuration 
import org.apache.mahout.math.cf.SimilarityAnalysis 
import org.apache.mahout.math.indexeddataset.Schema 
import org.apache.mahout.sparkbindings 
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark 
import org.apache.spark.rdd.RDD 
import org.bson.BSONObject 
import com.mongodb.hadoop.MongoInputFormat 


object SparkExample extends App { 
    implicit val mc = sparkbindings.mahoutSparkContext(masterUrl = "local", appName = "RowSimilarity") 
    val mongoConfig = new Configuration() 
    mongoConfig.set("mongo.input.uri", "mongodb://hostname:27017/db.collection") 

    val documents: RDD[(Object, BSONObject)] = mc.newAPIHadoopRDD(
    mongoConfig, 
    classOf[MongoInputFormat], 
    classOf[Object], 
    classOf[BSONObject] 
) 

    val documents_Array: RDD[(String, Array[String])] = documents.map(
    doc1 => (
     doc1._2.get("product_id").toString(), 
     doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-").mkString(" ")) 
    ) 
) 

    val new_doc: RDD[(String, String)] = documents_Array.flatMapValues(x => x) 
    val myIDs = IndexedDatasetSpark(new_doc)(mc) 

    val readWriteSchema = new Schema(
    "rowKeyDelim" -> "\t", 
    "columnIdStrengthDelim" -> ":", 
    "omitScore" -> false, 
    "elementDelim" -> " " 
) 
    SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://hadoop:9000/mongo-hadoop-rowsimilarity", readWriteSchema)(mc) 

} 

はbuild.sbt:

name := "scala-mongo" 
version := "1.0" 
scalaVersion := "2.10.6" 
libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1" 
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1" 
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2" 

libraryDependencies ++= Seq(
    "org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"), 
    "org.scalatest" % "scalatest_2.10" % "1.9.2" % "test" 
) 
libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2" 
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2" 
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2" 
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2" 

resolvers += "typesafe repo" at " http://repo.typesafe.com/typesafe/releases/" 
resolvers += Resolver.mavenLocal 

私はへmongo-hadoopを使用しましたMongoからデータを取得して使用する。私のデータは配列を持っていたので、flatMapValuesを使ってそれを平坦化し、正しい出力のためにIDSに渡さなければなりませんでした。

PS:このQ & Aはデータの取得と処理の全範囲をカバーするため、linked questionではなく、ここに回答を掲載しました。

関連する問題