私はカフカからの読み取りおよびCSVストリーミングデータセットのスキーマを動的に定義してcsvに書き込む方法は?
case class Event(map: Map[String,String])
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation
val eventDataset: Dataset[Event] = spark
.readStream
.format("kafka")
.load()
.select("value")
.as[Array[Byte]]
.map(decodeEvent)
Event
に書き込みをしようとしている内部Map[String,String]
を保持し、私はいくつかのスキーマが必要になりますCSVに書き込むために、ストリーミングデータセットを持っています。
によって引き起こさ:
のは、すべてのフィールドがタイプ
String
のものであり、私はこれがライン「eventDataset.rdd」に、実行時にエラーが発生しますspark repoval columns = List("year","month","date","topic","field1","field2") val schema = new StructType() //Prepare schema programmatically columns.foreach { field => schema.add(field, "string") } val rowRdd = eventDataset.rdd.map { event => Row.fromSeq( columns.map(c => event.getOrElse(c, "") )} val df = spark.sqlContext.createDataFrame(rowRdd, schema)
からの例を試してみましょう: org.apache.spark.sql.AnalysisException: ストリーミングソースのクエリは、writeStream.start();;で実行する必要があります。以下は
は「.MAP」のリストを持っているので、[文字列]は、プログラムのスキーマや構造化されたストリーミングデータセットでこれを達成するための方法はあり
eventDataset.map(event => columns.map(c => event.getOrElse(c,""))
.toDF(columns:_*)
タプルない動作しませんか?