2016-05-18 12 views
3

Tuple2内のフィールドにアクセスしようとしていますが、コンパイラがエラーを返しています。ソフトウェアは、カフカのトピック内でケースクラスをプッシュしようとすると、スパークストリーミングを使用して復旧したいので、機械学習アルゴリズムをフィードしてmongoインスタンス内に結果を保存することができます。Tuple2内のフィールドにアクセスする際のエラーについて

解決済み!

私は最終的に私は最終的なソリューションを投稿するつもりです、私の問題を解決しました:

これはgithubのプロジェクトです:

https://github.com/alonsoir/awesome-recommendation-engine/tree/develop 

build.sbt

name := "my-recommendation-spark-engine" 

version := "1.0-SNAPSHOT" 

scalaVersion := "2.10.4" 

val sparkVersion = "1.6.1" 

val akkaVersion = "2.3.11" // override Akka to be this version to match the one in Spark 

libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka_2.10" % "0.8.1" 
    exclude("javax.jms", "jms") 
    exclude("com.sun.jdmk", "jmxtools") 
    exclude("com.sun.jmx", "jmxri"), 
//not working play module!! check 
//jdbc, 
//anorm, 
//cache, 
// HTTP client 
"net.databinder.dispatch" %% "dispatch-core" % "0.11.1", 
// HTML parser 
"org.jodd" % "jodd-lagarto" % "3.5.2", 
"com.typesafe" % "config" % "1.2.1", 
"com.typesafe.play" % "play-json_2.10" % "2.4.0-M2", 
"org.scalatest" % "scalatest_2.10" % "2.2.1" % "test", 
"org.twitter4j" % "twitter4j-core" % "4.0.2", 
"org.twitter4j" % "twitter4j-stream" % "4.0.2", 
"org.codehaus.jackson" % "jackson-core-asl" % "1.6.1", 
"org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test", 
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1" , 
"org.apache.spark" % "spark-core_2.10" % "1.6.1" , 
"org.apache.spark" % "spark-streaming_2.10" % "1.6.1", 
"org.apache.spark" % "spark-sql_2.10" % "1.6.1", 
"org.apache.spark" % "spark-mllib_2.10" % "1.6.1", 
"com.google.code.gson" % "gson" % "2.6.2", 
"commons-cli" % "commons-cli" % "1.3.1", 
"com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1", 
// Akka 
"com.typesafe.akka" %% "akka-actor" % akkaVersion, 
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion, 
// MongoDB 
"org.reactivemongo" %% "reactivemongo" % "0.10.0" 
) 

packAutoSettings 

//play.Project.playScalaSettings 

カフカプロデューサー

package example.producer 

import play.api.libs.json._ 
import example.utils._ 
import scala.concurrent.Future 
import example.model.{AmazonProductAndRating,AmazonProduct,AmazonRating} 
import example.utils.AmazonPageParser 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 


/** 
args(0) : productId 
args(1) : userdId 

Usage: ./amazon-producer-example 0981531679 someUserId 3.0 
*/ 
object AmazonProducerExample { 

def main(args: Array[String]): Unit = { 

val productId = args(0).toString 
val userId = args(1).toString 
val rating = args(2).toDouble 
val topicName = "amazonRatingsTopic" 

val producer = Producer[String](topicName) 

//0981531679 is Scala Puzzlers... 
AmazonPageParser.parse(productId,userId,rating).onSuccess { case amazonRating => 
    //Is this the correct way? the best performance? possibly not, what about using avro or parquet? How can i push data in avro or parquet format? 
    //You can see that i am pushing json String to kafka topic, not raw String, but is there any difference? 
    //of course there are differences... 
    producer.send(Json.toJson(amazonRating).toString) 
    //producer.send(amazonRating.toString) 
    println("amazon product with rating sent to kafka cluster..." + amazonRating.toString) 
    System.exit(0) 
} 

} 
} 

これは、必要なケースクラスの定義(更新)、ファイルがmodels.scala命名されている。

package example.model 

import play.api.libs.json.Json 
import reactivemongo.bson.Macros 

case class AmazonProduct(itemId: String, title: String, url: String, img: String, description: String) 
case class AmazonRating(userId: String, productId: String, rating: Double) 

case class AmazonProductAndRating(product: AmazonProduct, rating: AmazonRating) 

// For MongoDB 
object AmazonRating { 
implicit val amazonRatingHandler = Macros.handler[AmazonRating] 
implicit val amazonRatingFormat = Json.format[AmazonRating] 
//added using @Yuval tip 
lazy val empty: AmazonRating = AmazonRating("-1", "-1", -1d) 
} 

これでスパークストリーミングプロセスの完全なコード:

package example.spark 

import java.io.File 
import java.util.Date 

import play.api.libs.json._ 
import com.google.gson.{Gson,GsonBuilder, JsonParser} 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.functions._ 

import com.mongodb.casbah.Imports._ 
import com.mongodb.QueryBuilder 
import com.mongodb.casbah.MongoClient 
import com.mongodb.casbah.commons.{MongoDBList, MongoDBObject} 

import reactivemongo.api.MongoDriver 
import reactivemongo.api.collections.default.BSONCollection 
import reactivemongo.bson.BSONDocument 

import org.apache.spark.streaming.kafka._ 
import kafka.serializer.StringDecoder 
import example.model._ 

import example.utils.Recommender 

/** 
* Collect at least the specified number of json amazon products in order to feed recomedation system and feed mongo instance with results. 

Usage: ./amazon-kafka-connector 127.0.0.1:9092 amazonRatingsTopic 

on mongo shell: 

use alonsodb; 
db.amazonRatings.find(); 
*/ 
object AmazonKafkaConnector { 

private var numAmazonProductCollected = 0L 
private var partNum = 0 
private val numAmazonProductToCollect = 10000000 

//this settings must be in reference.conf 
private val Database = "alonsodb" 
private val ratingCollection = "amazonRatings" 
private val MongoHost = "127.0.0.1" 
private val MongoPort = 27017 
private val MongoProvider = "com.stratio.datasource.mongodb" 

private val jsonParser = new JsonParser() 
private val gson = new GsonBuilder().setPrettyPrinting().create() 

private def prepareMongoEnvironment(): MongoClient = { 
    val mongoClient = MongoClient(MongoHost, MongoPort) 
    mongoClient 
} 

private def closeMongoEnviroment(mongoClient : MongoClient) = { 
    mongoClient.close() 
    println("mongoclient closed!") 
} 

private def cleanMongoEnvironment(mongoClient: MongoClient) = { 
    cleanMongoData(mongoClient) 
    mongoClient.close() 
} 

private def cleanMongoData(client: MongoClient): Unit = { 
    val collection = client(Database)(ratingCollection) 
    collection.dropCollection() 
} 

def main(args: Array[String]) { 
// Process program arguments and set properties 

if (args.length < 2) { 
    System.err.println("Usage: " + this.getClass.getSimpleName + " <brokers> <topics>") 
    System.exit(1) 
} 

val Array(brokers, topics) = args 

println("Initializing Streaming Spark Context and kafka connector...") 
// Create context with 2 second batch interval 
val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector") 
           .setMaster("local[4]") 
           .set("spark.driver.allowMultipleContexts", "true") 

val sc = new SparkContext(sparkConf) 
val sqlContext = new SQLContext(sc) 
sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) 
//this checkpointdir should be in a conf file, for now it is hardcoded! 
val streamingCheckpointDir = "/Users/aironman/my-recommendation-spark-engine/checkpoint" 
ssc.checkpoint(streamingCheckpointDir) 

// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
println("Initialized Streaming Spark Context and kafka connector...") 

//create recomendation module 
println("Creating rating recommender module...") 
val ratingFile= "ratings.csv" 
val recommender = new Recommender(sc,ratingFile) 
println("Initialized rating recommender module...") 
//THIS IS THE MOST INTERESTING PART AND WHAT I NEED! 
//THE SOLUTION IS NOT PROBABLY THE MOST EFFICIENT, BECAUSE I HAD TO 
//USE DATAFRAMES, ARRAYs and SEQs BUT IS FUNCTIONAL! 
try{ 
messages.foreachRDD(rdd => { 
val count = rdd.count() 
if (count > 0){ 
    val json= rdd.map(_._2) 
    val dataFrame = sqlContext.read.json(json) //converts json to DF 
    val myRow = dataFrame.select(dataFrame("userId"),dataFrame("productId"),dataFrame("rating")).take(count.toInt) 
    println("myRow is: " + myRow) 

    val myAmazonRating = AmazonRating(myRow(0).getString(0), myRow(0).getString(1), myRow(0).getDouble(2)) 
    println("myAmazonRating is: " + myAmazonRating.toString) 
    val arrayAmazonRating = Array(myAmazonRating) 
    //this method needs Seq[AmazonRating] 
    recommender.predictWithALS(arrayAmazonRating.toSeq) 
    }//if 
})  
}catch{ 
    case e: IllegalArgumentException => {println("illegal arg. exception")}; 
    case e: IllegalStateException => {println("illegal state exception")}; 
    case e: ClassCastException  => {println("ClassCastException")}; 
    case e: Exception    => {println(" Generic Exception")}; 
}finally{ 

    println("Finished taking data from kafka topic...") 
} 

ssc.start() 
ssc.awaitTermination() 

println("Finished!") 
} 
} 

はあなたのすべてに感謝し、 @Yuval、@Emecas、@ Riccardo.cardin。私@Yuval

を教え、私は可能な限り明らかになったと思う

def predict(ratings: Seq[AmazonRating]) = { 
    // train model 
    val myRatings = ratings.map(toSparkRating) 
    val myRatingRDD = sc.parallelize(myRatings) 

    val startAls = DateTime.now 
    val model = ALS.train((sparkRatings ++ myRatingRDD).repartition(NumPartitions), 10, 20, 0.01) 

    val myProducts = myRatings.map(_.product).toSet 
    val candidates = sc.parallelize((0 until productDict.size).filterNot(myProducts.contains)) 

    // get ratings of all products not in my history ordered by rating (higher first) and only keep the first NumRecommendations 
    val myUserId = userDict.getIndex(MyUsername) 
    val recommendations = model.predict(candidates.map((myUserId, _))).collect 
    val endAls = DateTime.now 
    val result = recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating) 
    val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds 

    println(s"ALS Time: $alsTime seconds") 
    result 
    } 

//、あなたがより多くのものを必要とするなら、私に教えて、あなたの忍耐に感謝:

Recommender.predict署名方法は次のようになります

答えて

1

診断

IllegalStateExceptionあなたはそのStreamingContext上で動作していることを示唆していますすでにアクティブまたはストップしています。 see details here (lines 218-231)

java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported 

コードレビュー

あなたのコードAmazonKafkaConnectorを観察することによって、あなたはと呼ばれる同じDirectStreamオブジェクトの上に別のforeachRDDmapfilterforeachRDDをやっている:messages

一般的なアドバイス:

あなたが実行するタスクのそれぞれのために小さな断片にあなたのロジックを分割することによって、私の友人に機能する:

  • ストリーミング
  • ML勧告
  • 永続
  • など

これは、実装したいSparkパイプラインの理解とデバッグに役立ちます。

1

問題が

def take(num: Int): Array[T] 
を述べたように文は、 Array[T]返す rdd.take(count.toInt)ということです

RDDの最初のnum要素をとります。

RDDには、最初のn個の要素が含まれています。次に、あなたが推測するものとは違って、タイプTuple2のオブジェクトではなく配列です。

あなたは、配列の各要素を印刷したい場合は、配列のすべての要素を持つ単一のStringを得ることがArray種類の方法mkStringに定義を使用することができます。

1

あなたがしようとしているように見えるのは、mapDStreamを超えていることです。 map操作は、タイプAからタイプBへの投影です。AString(Kafkaから受信しています)です。Bは、ケースクラスAmazonRatingです。

のは、あなたのAmazonRatingempty値を追加してみましょう:

case class AmazonRating(userId: String, productId: String, rating: Double) 

object AmazonRating { 
    lazy val empty: AmazonRating = AmazonRating("-1", "-1", -1d) 
} 

今度はJSONsを解析してみましょう:

val messages = KafkaUtils 
    .createDirectStream[String, String, StringDecoder, StringDecoder] 
    (ssc, kafkaParams, topicsSet) 

messages 
     .map { case (_, jsonRating) => 
     val format = Json.format[AmazonRating] 
     val jsValue = Json.parse(record) 
     format.reads(jsValue) match { 
      case JsSuccess(rating, _) => rating 
      case JsError(_) => AmazonRating.empty 
     } 
     .filter(_ != AmazonRating.empty) 
     .foreachRDD(_.foreachPartition(it => recommender.predict(it.toSeq))) 
+0

[OK]が、エラーは単純に彼はタイプ 'Array [T]'のオブジェクトをタイプ 'Tuple2'のオブジェクトと混同しています。 –

+0

@ riccardo.cardinいいえ、彼はカフカからのデータをデシリアライズしなかったので、 'Array [(String、String)]'を持っています。カフカはキーと値の両方を送るので、彼はタプルを持っています。彼はそれを彼のケースクラスに変換するための何もしていません。 –

+0

彼はこのステートメント 'val someMessages = rdd.take(count.toInt)'のために 'Array'を持っています。 –

関連する問題