2017-05-22 15 views
0

私たちの要件は、Phoenix(HBase)の時系列テーブルでいくつかの分析操作を行うことです。私たちはuniqueIdsを持つpostgresqlのテーブルを持っています。スパークを使用して時系列データを処理する

ここでは、postgresqlテーブルからすべてのuniqueIdsを取得し、対応するuniqueIdsとPhoenixテーブルをクエリし、分析関数を適用しています。ここでは、すべてのuniqueIdが順次処理されています。これを平行して実行する必要があります。この機能を実現するためにスカラとスパークを使用しています。以下は

あなたは私がこれを行うにはより良いアプローチが何であるかを教えてください可能性があり

val optionsMap = Map("driver" -> config.jdbcDriver, "url" -> config.jdbcUrl, 
     "user" -> config.jdbcUser, "password" -> config.jdbcPassword, 
     "dbtable" -> query) 
val uniqDF = sqlContext.read.format("jdbc").options(optionsMap).load() 

val results = uniqDF.collect 

results.foreach { uniqId => 
    val data = loadHbaseData(uniqId) 
    data.map(func).save() 
} 

def loadHbaseData(id: String): DataFrame = { 
    sqlContext.phoenixTableAsDataFrame("TIMESERIETABLE", Array("TIMESTAMP", "XXXX",""), predicate = Some("\"ID\" = '" + uniqueId + "' "), conf = configuration) 
} 

、サンプルコードのですか?

答えて

0

scalaが提供するparallel collection機能を使用できます。

results.par.foreach { 
// Your code to be executed 
} 
+0

私たちはすでにこれを試していますが、使用していないため、約30K +ユニークなIDが1時間以内で処理される必要があります。並列収集は、それらをすべて処理するのに非常に時間がかかる。 –

0

この単一DataFrameにあなたの解析関数を適用し、その後、あなたのHBase DataFrame秒の労働組合である1 DataFrameを作成します。ような何か:

val hbaseDFs = results.map(loadHbaseData) 
val unitedDF = hbaseDFs.reduce(_ union _) 
unitedDF.map(func).save() 

unionを行う前RDD秒にDataFrame秒に変換する方が速いかもしれないので、このアプローチは、DataFrameの多数の(30K +別の答えにコメントによる)にunionを呼び出しhereと記載されている。

関連する問題