0

私はHive SqlをSQLのSparkに変換してクエリのパフォーマンスをテストしたいと思っています。ここに私のハイブSQLがあります。誰も私にハイブSQLをスパークSqlに変換する方法を提案することができます。そのままHive SqlからSpark Sqlへの変換

SELECT split(DTD.TRAN_RMKS,'/')[0] AS TRAB_RMK1, 
split(DTD.TRAN_RMKS,'/')[1] AS ATM_ID, 
DTD.ACID, 
G.FORACID, 
DTD.REF_NUM, 
DTD.TRAN_ID, 
DTD.TRAN_DATE, 
DTD.VALUE_DATE, 
DTD.TRAN_PARTICULAR, 
DTD.TRAN_RMKS, 
DTD.TRAN_AMT, 
SYSDATE_ORA(), 
DTD.PSTD_DATE, 
DTD.PSTD_FLG, 
G.CUSTID, 
NULL AS PROC_FLG, 
DTD.PSTD_USER_ID, 
DTD.ENTRY_USER_ID, 
G.schemecode as SCODE 
FROM DAILY_TRAN_DETAIL_TABLE2 DTD 
JOIN ods_gam G 
ON DTD.ACID = G.ACID 
where substr(DTD.TRAN_PARTICULAR,1,3) rlike '(PUR|POS).*' 
AND DTD.PART_TRAN_TYPE = 'D' 
AND DTD.DEL_FLG <> 'Y' 
AND DTD.PSTD_FLG = 'Y' 
AND G.schemecode IN ('SBPRV','SBPRS','WSSTF','BGFRN','NREPV','NROPV','BSNRE','BSNRO') 
AND (SUBSTR(split(DTD.TRAN_RMKS,'/')[0],1,6) IN ('405997','406228','406229','415527','415528','417917','417918','418210','421539','421572','432198','435736','450502','450503','450504','468805','469190','469191','469192','474856','478286','478287','486292','490222','490223','490254','512932','512932','514833','522346','522352','524458','526106','526701','527114','527479','529608','529615','529616','532731','532734','533102','534680','536132','536610','536621','539149','539158','549751','557654','607118','607407','607445','607529','652189','652190','652157') OR SUBSTR(split(DTD.TRAN_RMKS,'/')[0],1,8) IN ('53270200','53270201','53270202','60757401','60757402')) 
limit 50; 
+0

を構築し、あなたはそれを実行してみましたがありますか? Hiveは、Hive機能のほとんどをサポートしています。 SYSDATE_ORA()はクエリの中にあると分かっているので、あなたはUDFで何かする必要があります。 – MaxNevermind

+0

Sparkはハイブ機能のほとんどをサポートしています。 – MaxNevermind

+0

私はマップ、フィルターなどのスパーク関数を使用してSQLを減らしたいので、クエリのパフォーマンスが向上します。 –

答えて

0

ちょうどあなたはまだあなたがクエリを分析することができ、より良い結果を得るために必要となる場合があるから、すぐにあなたがその前のMapReduce上のハイブでこのクエリを実行する場合は、この恩恵を受ける必要があり、それを使用してみてくださいさらにパーティション化を使用するように計画し、最適化します。 Sparkはメモリをより重く使用し、単純な変換を超えてMapReduceよりも一般に高速ですが、Spark SQLもCatalyst Optimizerを使用します。

「マップ、フィルタなどのスパーク関数を使用する」というご意見を参考に、map()はデータを変換するだけですが、文字列関数を使用しています。.map(...)を使用して何かを得ることはできません。あなた、filter()入力データをフィルタできれば、サブクエリやその他のSQL機能を使ってクエリを書き換えることができます。

2

質問は上記のコードを書くのに時間がかかりますが、私はここでコードを書こうとはしませんが、私はDataFramesアプローチを提供します。 DataFrameを使用して、クエリの上に実装するための柔軟性を持っている

Column操作 ..(あなたがScalaの機能/ udfにハイブUDFに適用/変換する場合)withColumnfilterなどの鋳造データ型などのためcast最近私がしましたこれとその演技者を行った。以下 はScalaの

val df1 = hivecontext.sql ("select * from ods_gam").as("G") 
    val df2 = hivecontext.sql("select * from DAILY_TRAN_DETAIL_TABLE2).as("DTD") 

で擬似コードで、

val joinedDF = df1.join(df2 , df1("G.ACID") = df2("DTD.ACID"), "inner") 
// now apply your string functions here... 
joinedDF.withColumn or filter ,When otherwise ... blah.. blah here 

注意をあなたのデータフレームを使用して参加:私はあなたのケースのUDFが必要とされていないで、単純な文字列関数が十分であろうと思います。

ダミーデータに

import util.Random 
import org.apache.spark.sql.Row 
implicit class Crossable[X](xs: Traversable[X]) { 
    def cross[Y](ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y) 
} 
val students = Seq("John", "Mike","Matt") 
val subjects = Seq("Math", "Sci", "Geography", "History") 
val random = new Random(1) 
val data =(students cross subjects).map{x => Row(x._1, x._2,random.nextInt(100))}.toSeq 

// Create Schema Object 
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType} 
val schema = StructType(Array(
      StructField("student", StringType, nullable=false), 
      StructField("subject", StringType, nullable=false), 
      StructField("score", IntegerType, nullable=false) 
    )) 

// Create DataFrame 
import org.apache.spark.sql.hive.HiveContext 
val rdd = sc.parallelize(data) 
val df = sqlContext.createDataFrame(rdd, schema) 

// Define udf 
import org.apache.spark.sql.functions.udf 
def udfScoreToCategory=udf((score: Int) => { 
     score match { 
     case t if t >= 80 => "A" 
     case t if t >= 60 => "B" 
     case t if t >= 35 => "C" 
     case _ => "D" 
    }}) 
df.withColumn("category", udfScoreToCategory(df("score"))).show(10) 
+0

注:Plsは@MaxNevermindの考え方を考慮して、文字列関数を持つ簡単なsqlを持っています。spark SQLアプローチまたはhiveqlアプローチを進める前に、パフォーマンス上の利点を知るために小さなpocを行います。 –