1

私はKafkaストリームをSparkに接続しました。私はApache Spark Mlibモデルをストリーミングされたテキストに基づいて予測するように訓練しました。私の問題は、DataFrameworkを渡す必要があるという予測を得ることです。Spark StreamでDataFrameを作成

//kafka stream  
val stream = KafkaUtils.createDirectStream[String, String](
      ssc, 
      PreferConsistent, 
      Subscribe[String, String](topics, kafkaParams) 
     ) 
//load mlib model 
val model = PipelineModel.load(modelPath) 
stream.foreachRDD { rdd => 

     rdd.foreach { record => 
     //to get a prediction need to pass DF 
     val toPredict = spark.createDataFrame(Seq(
      (1L, record.value()) 
     )).toDF("id", "review") 
     val prediction = model.transform(test) 
     } 
} 

私の問題は、スパークストリーミングではDataFrameを作成できないということです。それを行う方法はありますか?ケースクラスや構造体を使用できますか?

+0

DataFrameworkまたはDataFrame ?? – Gevorg

答えて

2

コアSparkの場合と同様に、DataFrameまたはDatasetをRDDから作成することができます。そのためには、スキーマを適用する必要があります。 foreachRDD内で、結果のRDDを、MLパイプラインでさらに使用できるDataFrameに変換することができます。

// we use a schema in the form of a case class 
case class MyStructure(field:type, ....) 
// and we implement our custom transformation from string to our structure 
object MyStructure { 
    def parse(str: String) : Option[MyStructure] = ... 
} 

val stream = KafkaUtils.createDirectStream... 
// give the stream a schema using a case class 
val strucStream = stream.flatMap(cr => MyStructure.parse(cr.value)) 

strucStream.foreachRDD { rdd => 
    import sparkSession.implicits._ 
    val df = rdd.toDF() 
    val prediction = model.transform(df) 
    // do something with df 
} 
+0

pysparkを使って同じことをするにはどうすればいいですか?私はpysparkでまったく同じ問題を抱えています –

関連する問題