2017-04-26 7 views
1

Spark Streamingを使用して、HDFSの寄木細工ファイルをMS SQL Serverにコピーしようとしています。 MS SQL Server用のJDBCドライバを使用しています。 私のコードは次のとおりです。Structured Streamingを使用してparquetファイルをHDFSからMS SQL Serverにコピーする方法?

val spark = SparkSession.builder().master("yarn").appName("StreamAFile").getOrCreate(); 
val userSchema = new StructType().add("mandt","string").add("lifnr","string").add("land1","string").add("name1","string").add("name2","string"); 
val myDF = spark.readStream.format("parquet").schema(userSchema).load("/parquetfilepath/*"); 
val query = myDF.writeStream.format("jdbc").option("driver","net.sourceforge.jtds.jdbc.Driver").option("dbtable","mytable").option("user","username").option("password","password").option("checkpointLocation","/homedirectory/").start("jdbc:jtds:sqlserver://SQLServer1:1433;DatabaseName=MyDB"); 

私はエラーを取得しています:誰もがこの前に働いていた場合

java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writing 

は修正してください。エラーとして

答えて

0

は言う:

Data source jdbc does not support streamed writing

それは構造化されたストリーミングではできません。以前のSpark Streaming APIを使用すると、より良い結果が得られるかもしれません(しかし、これはますます陳腐化しているので、この方法はお勧めしません)。

なぜこれは構造化ストリーミングを使用していますか?バッチSparkアプリケーションを作成しないでください。つまり、spark.readspark.writeです。それはうまくいくはずです。そしてcronを使うと、あなたは通常の処刑を受けることができます。

p.s.私はそのような仕事のためにスパークを使うつもりはないと思う。私はOozieなどがこのユースケースに適していると思います。 Sparkが私を恐れさせるような分散処理はありません。

+0

ありがとうございます。私が構造化ストリーミングを使用していた理由は、リアルタイムでデータをSQL Serverにコピーすることです。私はバッチSparkアプリケーションが並列ではなく順次実行することを期待しています。 –

+0

ストラクチャードストリーミングについても同様です(バッチ指向のままです)。そういうわけで私はいくつかの選択肢を提案することにとても熱心でした。 _魔法はありません –

0

スパークの構造化されたストリーミングは、foreach()の機能を提供するので、自分でJDBCSink()を定義するために使用できます。 This linkは、構造化ストリーミングでmysqlを使用するための優れたデモンストレーションです。 https://github.com/cynthia1wang/jdbcsink/blob/master/src/main/scala/DNSstat.scala

class JDBCSink() extends ForeachWriter[Row] { 
    val driver = "com.mysql.jdbc.Driver" 
    var connection:Connection = _ 
    var statement:Statement = _ 

    def open(partitionId: Long,version: Long): Boolean = { 
     Class.forName(driver) 
     connection = DriverManager.getConnection("jdbc:mysql://10.88.1.102:3306/aptwebservice", "root", "mysqladmin") 
     statement = connection.createStatement 
     true 
    } 
    def process(value: Row): Unit = { 
     statement.executeUpdate("replace into DNSStat(ip,domain,time,count) values(" 
           + "'" + value.getString(0) + "'" + ","//ip 
           + "'" + value.getString(1) + "'" + ","//domain 
           + "'" + value.getTimestamp(2) + "'" + "," //time 
           + value.getLong(3) //count 
           + ")") 
} 
    def close(errorOrNull: Throwable): Unit = { 
     connection.close 
    } 
} 
関連する問題