1
Spark DataframereaderのMechanicsに関する質問があります。誰かが私を助けることができれば感謝します。ここでシナリオを説明しましょうSpark Dataframereaderについてjdbc
私はこのようにDstreamからDataFrameを作成しています。この入力にデータ
var config = new HashMap[String,String]();
config += ("zookeeper.connect" ->zookeeper);
config += ("partition.assignment.strategy" ->"roundrobin");
config += ("bootstrap.servers" ->broker);
config += ("serializer.class" -> "kafka.serializer.DefaultEncoder");
config += ("group.id" -> "default");
val lines = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,config.toMap,Set(topic)).map(_._2)
lines.foreachRDD { rdd =>
if(!rdd.isEmpty()){
val rddJson = rdd.map { x => MyFunctions.mapToJson(x) }
val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)
val rddDF = sqlContext.read.json(rddJson)
rddDF.registerTempTable("inputData")
val dbDF = ReadDataFrameHelper.readDataFrameHelperFromDB(sqlContext, jdbcUrl, "ABCD","A",numOfPartiton,lowerBound,upperBound)
ここReadDataFrameHelper
def readDataFrameHelperFromDB(sqlContext:HiveContext,jdbcUrl:String,dbTableOrQuery:String,
columnToPartition:String,numOfPartiton:Int,lowerBound:Int,highBound:Int):DataFrame={
val jdbcDF = sqlContext.read.jdbc(url = jdbcUrl, table = dbTableOrQuery,
columnName = columnToPartition,
lowerBound = lowerBound,
upperBound = highBound,
numPartitions = numOfPartiton,
connectionProperties = new java.util.Properties()
)
jdbcDF
}
のコードは最後に、私はこの
のように参加val joinedData = rddDF.join(dbDF,rddDF("ID") === dbDF("ID")
&& rddDF("CODE") === dbDF("CODE"),"left_outer")
.drop(dbDF("code"))
.drop(dbDF("id"))
.drop(dbDF("number"))
.drop(dbDF("key"))
.drop(dbDF("loaddate"))
.drop(dbDF("fid"))
joinedData.show()
をやっている私の入力DSTREAMは1000行とデータを持っていますが含まれています百万行だから、私はこの結合を行うと、データベースからすべての行を読み込み、それらの行を読み上げるか、これはちょうど入力DStreamからcode,id
を持っているDBからそれらの特定の行を読み取るでしょう
これは完全な表をロードします。ここには述語プッシュはありません。 – zero323