2

私はカフカからの読み取りおよびCSVストリーミングデータセットのスキーマを動的に定義してcsvに書き込む方法は?

case class Event(map: Map[String,String]) 
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation 
val eventDataset: Dataset[Event] = spark 
    .readStream 
    .format("kafka") 
    .load() 
    .select("value") 
    .as[Array[Byte]] 
    .map(decodeEvent) 

Eventに書き込みをしようとしている内部Map[String,String]を保持し、私はいくつかのスキーマが必要になりますCSVに書き込むために、ストリーミングデータセットを持っています。

によって引き起こさ:

のは、すべてのフィールドがタイプStringのものであり、私はこれがライン「eventDataset.rdd」に、実行時にエラーが発生しますspark repo

val columns = List("year","month","date","topic","field1","field2") 
val schema = new StructType() //Prepare schema programmatically 
columns.foreach { field => schema.add(field, "string") } 
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
    columns.map(c => event.getOrElse(c, "") 
)} 
val df = spark.sqlContext.createDataFrame(rowRdd, schema) 

からの例を試してみましょう: org.apache.spark.sql.AnalysisException: ストリーミングソースのクエリは、writeStream.start();;で実行する必要があります。以下は

は「.MAP」のリストを持っているので、[文字列]は、プログラムのスキーマや構造化されたストリーミングデータセットでこれを達成するための方法はあり

eventDataset.map(event => columns.map(c => event.getOrElse(c,"")) 
.toDF(columns:_*) 

タプルない動作しませんか?

答えて

1

私は非常に単純なアプローチを使用したい:

import org.apache.spark.sql.functions._ 

eventDataset.select(columns.map(
    c => coalesce($"map".getItem(c), lit("")).alias(c) 
): _*).writeStream.format("csv").start(path) 

をいますが、現在のソリューションに近い何かがRDD変換をスキップしたい場合は

import org.apache.spark.sql.catalyst.encoders.RowEncoder 

eventDataset.rdd.map(event => 
    Row.fromSeq(columns.map(c => event.getOrElse(c,""))) 
)(RowEncoder(schema)).writeStream.format("csv").start(path) 
関連する問題