2017-05-23 6 views
1

私は次のクラスを持っています。runは、データベーステーブルからintのリストを返します。データフレームを強力な型付きデータセットに変換しますか?

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) { 
    def run(date: LocalDate) = { 
    sqlContext.read.format("jdbc").options(Map(
     "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver", 
     "url" -> jdbcSqlConn, 
     "dbtable" -> s"dbo.GetList('$date')" 
    )).load() 
    } 
} 

次のコード

val conf = new SparkConf() 
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]")) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

val itemListJob = new ItemList(sqlContext, jdbcSqlConn) 
val processed = itemListJob.run(rc, priority).select("id").map(d => { 
    runJob.run(d) // d expected to be int 
}) 
processed.saveAsTextFile("c:\\temp\\mpa") 

 
[error] ...\src\main\scala\main.scala:39: type mismatch; 
[error] found : org.apache.spark.sql.Row 
[error] required: Int 
[error]  runJob.run(d) 
[error]    ^
[error] one error found 
[error] (compile:compileIncremental) Compilation failed 

のエラーを取得する私は

  1. val processed = itemListJob.run(rc, priority).select("id").as[Int].map(d =>
  2. を試してみました

人とも

データセットに保存されているタイプのためのエンコーダを見つけることができませんのエラーを得ました。 spark.implicits._をインポートすることで、プリミティブ型(Int、Stringなど)とProduct型(ケースクラス)がサポートされます。他の型をシリアル化するためのサポートは、将来のリリースで追加されます。

更新: 私は

  1. import sc.implicits._輸入暗黙のステートメントを追加しようとしているが

    値の暗黙の誤差がorg.apache.sparkのメンバーではありませんです.SparkContext

  2. import sqlContext.implicits._は問題ありません。しかしながら、processed.saveAsTextFile("c:\\temp\\mpa")の後の文は

値saveAsTextFileの誤差がorg.apache.spark.sql.Dataset [(INT、java.time.LocalDate)]のメンバーではない持って

select("id").as[Int] 

あなたがにRowsを変換するための暗黙をインポートする必要があります。次のようにあなたが単にあることをselect("id")の行を変更する必要があり

+0

as [Int]を使用しているときのエラーは何ですか? – zsxwing

+0

エラーは 'データセットに格納された型のエンコーダを見つけることができません。 ' – ca9163d9

+0

あなたは' import spark.implicits._'を試してみましたか?他のタイプをシリアライズするためのサポートは、将来のリリースで追加されます。 ? – zsxwing

答えて

3

Ints。

import sqlContext.implicits._ // <-- import implicits that add the "magic" 

次のようにも(私が追加された行へのコメントは注意してください)変換を含めるようにrunを変更することができます:

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) { 
    def run(date: LocalDate) = { 
    import sqlContext.implicits._ // <-- import implicits that add the "magic" 
    sqlContext.read.format("jdbc").options(Map(
     "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver", 
     "url" -> jdbcSqlConn, 
     "dbtable" -> s"dbo.GetList('$date')" 
    )).load() 
    .select("id") // <-- take only "id" (which Spark pushes down and hence makes your query faster 
    .as[Int] // <-- convert Row into Int 
    } 
} 

値saveAsTextFileがメンバーではありませんorg.apache.spark.sql.Dataset [(Int、java.time。あなたが利用できないDatasetsaveAsTextFile操作を使用しようとするためLOCALDATE)]

コンパイルエラーです。外部ストレージに出て非ストリーミングデータセットの内容を保存するためのDataFrameWriter [T]インタフェース:

書き込み:スパークSQLで

書き込みがwrite演算子を使用して利用可能ですDataFrameWriterています。

だから、次の操作を行う必要があります。

完了
processed.write.text("c:\\temp\\mpa") 

+0

私は変更されましたItemList.run()を呼び出して、提案したようにDataSetを返します。しかし、 'val processed = itemListJob.run(...)。map(....)'を呼び出す前に 'import sqlContext.implicits._'を追加する必要があります。 – ca9163d9

関連する問題