2017-05-19 6 views
3

Spark 2.1.1(kafka 0.10+)を使用してKafkaのトピックを読んでいますが、ペイロードはJSON文字列です。私はスキーマで文字列を解析し、ビジネスロジックで前進したいと思います。from_jsonが "not found:value from_json"で失敗するのはなぜですか?

皆さんは、私がfrom_jsonを使用してJSON文字列を解析することを提案しているようですが、私の状況ではコンパイルされていないようです。エラーが

not found : value from_json 
.select(from_json($"json", txnSchema) as "data") 

私はスパークシェルに以下の行を試みたときに、それだけで正常に動作している -

val df = stream 
    .select($"value" cast "string" as "json") 
    .select(from_json($"json", txnSchema) as "data") 
    .select("data.*") 

すべてのアイデア、私はこの作品がで働いて見にコードで間違って何をやっている可能性がシェルではなくIDE /コンパイル時に? - import org.apache.spark.sql.functions._あなたはおそらく、関連するインポートを逃している

import org.apache.spark.sql._ 

object Kafka10Cons3 extends App { 
    val spark = SparkSession 
    .builder 
    .appName(Util.getProperty("AppName")) 
    .master(Util.getProperty("spark.master")) 
    .getOrCreate 

    val stream = spark 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker")) 
    .option("subscribe", src_topic) 
    .load 

    val txnSchema = Util.getTxnStructure 
    val df = stream 
    .select($"value" cast "string" as "json") 
    .select(from_json($"json", txnSchema) as "data") 
    .select("data.*") 
} 

答えて

7

は、ここでは、コードです。

spark.implicits._org.apache.spark.sql._をインポートしましたが、これらのいずれもfunctionsに個別の関数をインポートするものはありません。


私も、それはまた、(それが?最初にインポートされたので)との私のバージョンと明らかに互換性がありませんでしたどのコンパイラが選んだ一つとなっている必要があります from_json機能を有しているように見える com.wizzardo.tools.jsonをインポートした

spark

他のjsonライブラリからfrom_json関数をインポートしていないことを確認してください。このライブラリは使用しているsparkのバージョンと互換性がない可能性があります。

+0

いいえ、私はSpark 2.1.1を使用していますが、このインポートが機能します。私が考えることができるのは、クラスパスに2つの異なるバージョンがあり、コンパイラで取り上げられたバージョンが古すぎるということだけです。 –

関連する問題