2016-10-10 6 views
0

スカラ座を使用してスパークにpriorly一時ビューを登録する必要なし(、SELECTFROMWITHJOIN Sの異なる種類を含む)SQL文を実行する方法はあります?目標は、DataFrameをSQLコードから迂回させることなく取得することです。スカラ&スパーク:登録tempviewせずにネイティブSQLのクエリー

the documentation

によって提供されるどのように動作する(既存のDataFrameを使用しtempview登録)例:

// df is an existing DataFrame 
df.createOrReplaceTempView("people") 
val sqlDF = spark.sql("SELECT * FROM people") 
sqlDF.show() 
DataFrame

既存の問題は、のために使用される基礎となるDataFrameのもののみサブ量、ですtempviewを生成することができます。 SQLステートメントが多くの異なるテーブルまたはビューからのデータを使用する場合、これは非常に実用的ではありません。何かのように

// SQL is directly executed on database 
val dfView = spark.sql(connectionProperties, 
         "SELECT * 
         FROM DATABASE_USER.V_VIEW_IN_DATABASE v1 
         JOIN DATABASE_USER.V_VIEW2_IN_DATABASE v2 
         ON v1.key = v2.key") 
dfView.show() 

自動タイプの推論で私の問題を解決するだろう。 this questionで指摘されている1つの可能性のあるパスを追いかけています。

セットアップ:Hadoopのv.2.7.32.0.0スパーク、インテリJのIDEA 2016.2、Scalaの2.11.8、Testcluster Win7のワークステーションは、Oracle 12cのデータベース上の

答えて

2

試してみてください。

import org.apache.spark.ml.feature.SQLTransformer 

val df = spark.read.format("jdbc") 
     .options(Map("url" -> "jdbc:postgresql://<server>:<port>/<db>?user=<user>&password=<password>", "dbtable" -> "<dbtable>")) 
     .load() 
val ans = new SQLTransformer() 
    .setStatement("SELECT value + id AS points FROM __THIS__") 
    .transform(df) 
ans.show() 

+------+ 
|points| 
+------+ 
| 101.0| 
| 202.0| 
+------+ 

あなたには、いくつかの砂糖のコーティングをしたい場合:

object ImprovedDataFrameContext { 
    import org.apache.spark.ml.feature.SQLTransformer 

    implicit class ImprovedDataFrame(df: org.apache.spark.sql.DataFrame) { 
     def T(query: String): org.apache.spark.sql.DataFrame = { 
      new SQLTransformer().setStatement(query).transform(df) 
     } 
    } 
} 
import ImprovedDataFrameContext._ 

val df = spark.read.format("jdbc") 
     .options(Map("url" -> "jdbc:postgresql://<server>:<port>/<db>?user=<user>&password=<password>", "dbtable" -> "<dbtable>")) 
     .load() 
     .T("<sql_query>") 
     .show() 

+------+ 
|points| 
+------+ 
| 101.0| 
| 202.0| 
+------+ 
+0

返信いただきありがとうございます! 'Seq((1.0,100.0)、(2.0,200.0))'をデータベースから直接データを照会するものに置き換えることはできません。 – Boern

+0

ありがとう、私はそれを理解しています。私が理解できないことは、サンプルデータフレームコードを、データベース(例えば、jdbc)からデータを照会するSQLで置き換える方法です。 '' SELECT * FROM DATABASE_USER.V_VIEW_IN_DATABASE ''のようなものです。つまり、SQLから 'DataFrame'を作成するにはどうしたらいいですか? – Boern

+0

私はpostgresdbからの読解を編集しました(あなたのsparkクラスパスにjdbcドライバを置き、適切な接続オプションを設定する必要があります)。 – Xavi

関連する問題