Spark Streamingでは、新しいメッセージが受信されるたびに、この新しいメッセージに基づいてsthを予測するためにモデルが使用されます。しかし、時間が経つにつれて、モデルが何らかの理由で変更することができますので、新しいメッセージが入って来たときに私がモデルを再ロードする。私はこのコードを実行すると私のコードがあり、この[Spark Streaming]新しいメッセージが到着するたびにモデルを読み込む方法は?
def loadingModel(@transient sc:SparkContext)={
val model=LogisticRegressionModel.load(sc, "/home/zefu/BIA800/LRModel")
model
}
var error=0.0
var size=0.0
implicit def bool2int(b:Boolean) = if (b) 1 else 0
def updateState(batchTime: Time, key: String, value: Option[String], state: State[Array[Double]]): Option[(String, Double,Double)] = {
val model=loadingModel(sc)
val parts = value.getOrElse("0,0,0,0").split(",").map { _.toDouble }
val pairs = LabeledPoint(parts(0), Vectors.dense(parts.tail))
val prediction = model.predict(pairs.features)
val wrong= prediction != pairs.label
error = state.getOption().getOrElse(Array(0.0,0.0))(0) + 1.0*(wrong:Int)
size=state.getOption().getOrElse(Array(0.0,0.0))(1) + 1.0
val output = (key, error,size)
state.update(Array(error,size))
Some(output)
}
val stateSpec = StateSpec.function(updateState _)
.numPartitions(1)
setupLogging()
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = List("test").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics).mapWithState(stateSpec)
のように見えますこのような例外になります
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
詳細が必要な場合はお知らせください。 ありがとうございました!
完全なスタックトレースを投稿できますか?また、map、filterのようなスパーク変換の中で直列化できないクラスを使用しようとしていますか? – Shankar
@Shankarこんにちは、 'loadMode'lを定義せずに' updateState'の外にモデル( 'val model = LogisticRegressionModel.load(sc、"/home/zefu/BIA800/LRModel ")')をロードするだけでうまくいきます。私は問題が 'sc'で来ると思います –
@Shankarと私はそこにコードを追加しました:P –