2016-10-17 9 views
2

Spark Streamingでは、新しいメッセージが受信されるたびに、この新しいメッセージに基づいてsthを予測するためにモデルが使用されます。しかし、時間が経つにつれて、モデルが何らかの理由で変更することができますので、新しいメッセージが入って来たときに私がモデルを再ロードする。私はこのコードを実行すると私のコードがあり、この[Spark Streaming]新しいメッセージが到着するたびにモデルを読み込む方法は?

def loadingModel(@transient sc:SparkContext)={ 
    val model=LogisticRegressionModel.load(sc, "/home/zefu/BIA800/LRModel") 
    model 
} 

var error=0.0 
var size=0.0 
implicit def bool2int(b:Boolean) = if (b) 1 else 0 
def updateState(batchTime: Time, key: String, value: Option[String], state: State[Array[Double]]): Option[(String, Double,Double)] = { 
    val model=loadingModel(sc) 
    val parts = value.getOrElse("0,0,0,0").split(",").map { _.toDouble } 
    val pairs = LabeledPoint(parts(0), Vectors.dense(parts.tail)) 
    val prediction = model.predict(pairs.features) 
    val wrong= prediction != pairs.label 
    error = state.getOption().getOrElse(Array(0.0,0.0))(0) + 1.0*(wrong:Int) 
    size=state.getOption().getOrElse(Array(0.0,0.0))(1) + 1.0 
    val output = (key, error,size) 
    state.update(Array(error,size)) 
    Some(output) 
} 
val stateSpec = StateSpec.function(updateState _) 
    .numPartitions(1) 
setupLogging() 
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 
val topics = List("test").toSet 
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topics).mapWithState(stateSpec) 

のように見えますこのような例外になります

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 

詳細が必要な場合はお知らせください。 ありがとうございました!

+0

完全なスタックトレースを投稿できますか?また、map、filterのようなスパーク変換の中で直列化できないクラスを使用しようとしていますか? – Shankar

+0

@Shankarこんにちは、 'loadMode'lを定義せずに' updateState'の外にモデル( 'val model = LogisticRegressionModel.load(sc、"/home/zefu/BIA800/LRModel ")')をロードするだけでうまくいきます。私は問題が 'sc'で来ると思います –

+0

@Shankarと私はそこにコードを追加しました:P –

答えて

0

DStream関数内でモデルを使用すると、sparkはコンテキストオブジェクトを直列化するように見えます(モデルのロード関数はscを使用するため)。コンテキストオブジェクトが直列化できないため、失敗します。 1つの回避策は、DStreamをRDDに変換し、結果を収集して、ドライバでモデルの予測/スコアリングを実行することです。

ストリーミングをシミュレートするために使用されるnetcatユーティリティーは、DStreamをRDDに変換するために次のコードを試しました。それが役立つかどうかを見てください。

val ssc = new StreamingContext(sc,Seconds(10)) 
val lines = ssc.socketTextStream("xxx", 9998) 
val linedstream = lines.map(lineRDD => Vectors.dense(lineRDD.split(" ").map(_.toDouble))) 
val logisModel = LogisticRegressionModel.load(sc, /path/LR_Model") 
linedstream.foreachRDD(rdd => { 
    for(item <- rdd.collect().toArray) { 
    val predictedVal = logisModel.predict(item) 
     println(predictedVal + "|" + item); 
    } 
}) 

集める理解は、ここではスケーラブルではありませんが、あなたはあなたのストリーミングメッセージが任意の間隔の数が少ないと思われる場合、これはおそらくオプションです。これはSpark 1.4.0で可能なことですが、上位バージョンにはおそらくこれが修正されています。

Save ML model for future usage

関連する問題