7
私は[Databricks] [1]からサンプルを再現し、kafkaの新しいコネクタに適用し、構造化されたストリーミングを実行しようとしていましたが、sparkのout-of-the-boxメソッドを使ってjsonを正しく解析できません。kafka connect 0.10とSpark Structured Streamingでfrom_jsonを使用するには?
注:トピックはjson形式でkafkaに書き込まれます。
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", IP + ":9092")
.option("zookeeper.connect", IP + ":2181")
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load()
次のコードは、私が
val df = ds1.select($"value" cast "string" as "json")
.select(from_json("json") as "data")
.select("data.*")
任意のヒント...列JSONが文字列であると署名from_json方法と一致していないので、それはだと信じて、動作しないのだろうか?
[UPDATE]例の作業: https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala
あなたはコンパイラの警告がある場合は、「値$がメンバーではありませんが...」インポートを忘れないでください、それが私のための質問を – user1459144
を把握するために私にさらに5〜10分を要したspark.implicits._どのライブラリが "from_json"という関数を提供していますか?私はそれを置くように思わない!助けてください.. – Gyan
@Raghav - >インポートorg.apache.spark.sql.functions._ ここの例を確認してください:https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/マスター/ src/main/scala-2.11/Main.scala –