Spark 2.1.1(kafka 0.10+)を使用してKafkaのトピックを読んでいますが、ペイロードはJSON文字列です。私はスキーマで文字列を解析し、ビジネスロジックで前進したいと思います。from_jsonが "not found:value from_json"で失敗するのはなぜですか?
皆さんは、私がfrom_json
を使用してJSON文字列を解析することを提案しているようですが、私の状況ではコンパイルされていないようです。エラーが
not found : value from_json
.select(from_json($"json", txnSchema) as "data")
私はスパークシェルに以下の行を試みたときに、それだけで正常に動作している -
val df = stream
.select($"value" cast "string" as "json")
.select(from_json($"json", txnSchema) as "data")
.select("data.*")
すべてのアイデア、私はこの作品がで働いて見にコードで間違って何をやっている可能性がシェルではなくIDE /コンパイル時に? - import org.apache.spark.sql.functions._
あなたはおそらく、関連するインポートを逃している
import org.apache.spark.sql._
object Kafka10Cons3 extends App {
val spark = SparkSession
.builder
.appName(Util.getProperty("AppName"))
.master(Util.getProperty("spark.master"))
.getOrCreate
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load
val txnSchema = Util.getTxnStructure
val df = stream
.select($"value" cast "string" as "json")
.select(from_json($"json", txnSchema) as "data")
.select("data.*")
}
いいえ、私はSpark 2.1.1を使用していますが、このインポートが機能します。私が考えることができるのは、クラスパスに2つの異なるバージョンがあり、コンパイラで取り上げられたバージョンが古すぎるということだけです。 –