2017-08-04 11 views
0

JDBCでpreparestatementを使用しようとしています。 ResultSetオブジェクトが生成されます。私はそれをスパークのデータフレームに変換したい。java ResultsetをSparkデータフレームに変換する方法

object JDBCRead { 

val tableName:String = "TABLENAME" 
val url :String = "jdbc:teradata://TERADATA_URL/user=USERNAME,password=PWD,charset=UTF8,TYPE=FASTEXPORT,SESSIONS=10" 
val selectTable:String = "SELECT * FROM " + tableName +" sample 10"; 

val con : Connection = DriverManager.getConnection(url); 


val pstmt2: PreparedStatement = con.prepareStatement(selectTable) 

import java.sql.ResultSet 

val rs: ResultSet = pstmt2.executeQuery 



val rsmd: ResultSetMetaData = rs.getMetaData 
while(rs.next()!=null) 
{ 
    val k: Boolean = rs.next() 
    for(i<-1 to rsmd.getColumnCount) { 
    print(" " + rs.getObject(i)) 
    } 
    println() 
} 

} 

上記のコードをSpark Dataframeから呼び出して、データフレームにデータをロードし、結果をより速く分散することができます。

私はPreparedStatementを使用する必要があります。 TeradataのFASTEXPORTがjdbc loadで動作しないため、私はspark.jdbc.loadを使用できません。それを使用する必要がありますPreparedStatement

これを行うには?どのようにしてSpark Dataframeにロードするために、SELECTステートメントと一緒にpreparestatementを使用することができますか?

答えて

1

-

要件 1. DataFrame 2. JdbcRDD

、この種のために利用可能な2つのオプションがあります私の知る限り、あなたはPreparedStatementのにとても固有のものですので、私は(JdbcRDDを提供したいです)

次に、prepareStatement内部的にはcomputeメソッドである。したがって、接続を作成して明示的に管理する必要はありません(エラーが発生しやすい)。

その後、あなたは他のパラメータを設定することができますスピードのためにデータフレーム

にして、結果を変換することができます。 JdbcRDDの例コードの使用は、以下である

..

import org.apache.log4j.{Level, Logger} 
    import org.apache.spark.SparkContext 
    import org.apache.spark.SparkContext.__ 
    import org.apache.spark.SparkConf 
    import org.apache.spark.rdd.JdbcRDD 
    import java.sql.{connection, DriverManager,ResultSet} 


    object jdbcRddExample { 
    def main(args: Array[String]) { 

     // Connection String  
     VAL URL = "jdbc:teradata://SERVER/demo" 
     val username = "demo" 
     val password = "Spark" 
     Class.forName("com.teradata.jdbc.Driver").newInstance 
     // Creating & Configuring Spark Context 
     val conf = new SparkConf().setAppName("App1").setMaster("local[2]").set("spark.executor.memory",1) 
     val sc = new SparkContext(conf) 
     println("Start...") 
     // Fetching data from Database 
     val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password), 
     "select first_name, last_name, gender from person limit ?,?", 
     3,5,1,r => r.getString("last_name") + "," +r.getString("first_name")) 
     // Displaying the content 
     myRDD.foreach(println) 
     // Saving the content inside Text File 
     myRDD.saveAsTextFile("c://jdbcrdd") 

     println("End...") 
    } 
    } 
関連する問題