2017-10-09 8 views
0

spark job server API(spark 2.2.0用)を使用してアプリケーションを構築しようとしています。しかし、sparkSessionでnamedObjectをサポートしていないことがわかりました。 ように私のルックス:なぜsparkSessionにsparkSessionのサポートがありませんか?

import com.typesafe.config.Config 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.storage.StorageLevel 
import org.scalactic._ 
import spark.jobserver.{NamedDataFrame, NamedObjectSupport, SparkSessionJob} 
import spark.jobserver.api.{JobEnvironment, SingleProblem, ValidationProblem} 

import scala.util.Try 

object word1 extends SparkSessionJob with NamedObjectSupport { 
    type JobData = Seq[String] 
    type JobOutput = String 

def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = 
{ 
    val df = sparkSession.sparkContext.parallelize(data) 
    val ndf = NamedDataFrame(df, true, StorageLevel.MEMORY_ONLY) 
    this.namedObjects.update("df1", ndf) 
    this.namedObjects.getNames().toString 
} 


def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config): 
    JobData Or Every[ValidationProblem] = { 
Try(config.getString("input.string").split(" ").toSeq) 
    .map(words => Good(words)) 
    .getOrElse(Bad(One(SingleProblem("No input.string param")))) 
    } 

} 

が、ラインthis.namedObjects.updateで()エラーが発生しています。私は彼らがnamedObjectをサポートしていないと思います。同じコードがSparkJobでコンパイルされています:

object word1 extends SparkJob with NamedObjectSupport 

sparksessionの名前付きオブジェクトのサポートはありますか?そうでない場合は、データフレーム/データセットを永続化するのはどうですか?

答えて

0

私はそれを理解しました。それは私の側から愚かな間違いだった。 https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server-api/src/main/scala/spark/jobserver/NamedObjectSupport.scala#L138から。それは言う通り:

// NamedObjectSupportは、api.SparkJobBaseのJobEnvironmentのためにもう必要ありません。また、互換性のために古いspark.jobserver.SparkJobBaseに自動的にインポートされます。 です。

import com.typesafe.config.Config 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.storage.StorageLevel 
import org.scalactic._ 
import spark.jobserver.{NamedDataFrame, NamedObjectSupport, SparkSessionJob} 
import spark.jobserver.api.{JobEnvironment, SingleProblem, ValidationProblem} 

import scala.util.Try 

object word1 extends SparkSessionJob with NamedObjectSupport { 
    type JobData = Seq[String] 
    type JobOutput = String 

def runJob(sparkSession: SparkSession, runtime: JobEnvironment, data: JobData): JobOutput = 
    { 
    val df = sparkSession.sparkContext.parallelize(data) 
    val ndf = NamedDataFrame(df, true, StorageLevel.MEMORY_ONLY) 
    runtime.namedObjects.update("df1", ndf) 
    runtime.namedObjects.getNames().toString 
    } 


def validate(sparkSession: SparkSession, runtime: JobEnvironment, config: Config): 
    JobData Or Every[ValidationProblem] = { 
Try(config.getString("input.string").split(" ").toSeq) 
    .map(words => Good(words)) 
    .getOrElse(Bad(One(SingleProblem("No input.string param")))) 
    } 

} 

@Deprecated 
trait NamedObjectSupport 

したがって、我々はにこのコードを変更する必要があり、これらの機能にアクセスするには

関連する問題