2015-09-08 3 views
13

私がメインの持っているスパークコンテキストを作成します。スパークのSQLデータフレーム - 輸入sqlContext.implicits._

val sc = new SparkContext(sparkConf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

そして、データフレームを作成し、データフレームにフィルタおよび検証を行います。

val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00") 

    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0)) 
    // record length cannot be < 2 
    .na.drop(3) 
    // round to hours 
    .withColumn("time",convertToHourly($"time")) 

これは素晴らしいです。

しかし、私はデータフレーム&を取得

function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 

にデータフレームを送信することにより、別のファイルに私の検証を移動してみてください検証および変換を行います。私は

import sqlContext.implicits._ 

To avoid the error: “value $ is not a member of StringContext” that happens on line: .withColumn("time",convertToHourly($"time"))

を必要とするように思えます

でも、import sqlContext.implicits._ を使用するには、sqlContextが新しく定義されている必要がありますそのようなファイル:

val sc = new SparkContext(sparkConf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

または私は2つのファイル(メイン&検証)にやろうとしているの分離が正しく行われていないように私は感じ

function ValidateAndTransform(df: DataFrame) : DataFrame = {...} 
function 

に送信...

これはどのように設計するのですか?または単にsqlContextを関数に送りますか?

ありがとうございます!

+0

にこの例を見てみることができます。私は何か良いことを思い付くことができなかったので、この質問に投票し、より良い欲求を待つ。 – Niemand

答えて

11

SQLContextの単一インスタンスで作業できます。私はちょうど新しいクラスのコンストラクタでSQLContextを渡すというようなものを分離したいと、私は各クラスごとに一度sqlContext.implicits._をインポートするときには、spark repository

/** Lazily instantiated singleton instance of SQLContext */ 
object SQLContextSingleton { 

    @transient private var instance: SQLContext = _ 

    def getInstance(sparkContext: SparkContext): SQLContext = { 
    if (instance == null) { 
     instance = new SQLContext(sparkContext) 
    } 
    instance 
    } 
} 
... 
//And wherever you want you can do 
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) 
import sqlContext.implicits._ 
+1

ありがとう!私はシングルトンオブジェクトを使用しましたが、私の場合、私はそれがとても一度だけ作成しなかったとしますオブジェクトSQLContextSingleton { @Transientするvarインスタンス:SQLContext = _ }メインからそれを初期化し、検証の上、それを使用していました。助けてくれてありがとう! –