2016-10-07 22 views
0

私は、複数のソースシステム(Mysqlインスタンス)から5分ごとにデータを取得し、他のデータ(S3に存在することができます)で結合して豊かにする必要があります。SparkでのMysqlデータ処理

Sparkでこの処理を行い、複数のエグゼキュータに実行を分散したいと考えました。

主な問題は、私がMysqlでルックアップを行うたびに、私は最新のレコードを取得したいだけです(lastModifiedOn> timestampで言うことができます)。 この選択的なMySql行のフェッチは、どのように効果的に実行できますか? これは私がしようとしているものです:

val filmDf = sqlContext.read.format("jdbc") 
    .option("url", "jdbc:mysql://localhost/sakila") 
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "film").option("user", "root").option("password", "") 
    .load() 
+0

はあなたがしようとしているものとのあなたの質問を更新することはできますか? – eliasah

+0

@eliasahはい投稿を更新します。 – Karshit

答えて

0

をあなたは、JDBCデータソースとスパークSQLを使用する必要があります。私はあなたに例を示します。

val res = spark.read.jdbc(
     url = "jdbc:mysql://localhost/test?user=minty&password=greatsqldb", 
     table = "TEST.table", 
     columnName = "lastModifiedOn", 
     lowerBound = lowerTimestamp, 
     upperBound = upperTimestamp, 
     numPartitions = 20, 
     connectionProperties = new Properties() 
    ) 

Apacheのスパークテストスイートでより多くの例があります:https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

+0

ありがとう、numPartitionsの意味は何ですか? 接続プロパティも同様ですか? – Karshit

+0

結果は、numPartitionsパーティションを持つDataFrameになります。 Sparkは並列にnumPartitionsクエリを実行して結果を取得します。例: lowerBound = 1、upperBound = 10、numPartitions = 2、Sparkは2つのクエリ、最初は1〜5、もう1つは6〜10です。 – gasparms

+0

connectionPropertiesは、いくつかのプロパティをdbに渡すためのマップです。あなたのデータベースに依存して、それを使用するかどうかは決まります。 – gasparms

関連する問題