2016-05-26 8 views
1

Spark DataframereaderのMechanicsに関する質問があります。誰かが私を助けることができれば感謝します。ここでシナリオを説明しましょうSpark Dataframereaderについてjdbc

私はこのようにDstreamからDataFrameを作成しています。この入力にデータ

var config = new HashMap[String,String](); 
     config += ("zookeeper.connect" ->zookeeper);   
     config += ("partition.assignment.strategy" ->"roundrobin"); 
     config += ("bootstrap.servers" ->broker); 
     config += ("serializer.class" -> "kafka.serializer.DefaultEncoder"); 
     config += ("group.id" -> "default"); 

     val lines = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,config.toMap,Set(topic)).map(_._2) 

     lines.foreachRDD { rdd => 

       if(!rdd.isEmpty()){ 

        val rddJson = rdd.map { x => MyFunctions.mapToJson(x) }  





        val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext) 

        val rddDF = sqlContext.read.json(rddJson) 

        rddDF.registerTempTable("inputData") 



val dbDF = ReadDataFrameHelper.readDataFrameHelperFromDB(sqlContext, jdbcUrl, "ABCD","A",numOfPartiton,lowerBound,upperBound) 

ここReadDataFrameHelper

def readDataFrameHelperFromDB(sqlContext:HiveContext,jdbcUrl:String,dbTableOrQuery:String, 
      columnToPartition:String,numOfPartiton:Int,lowerBound:Int,highBound:Int):DataFrame={ 

     val jdbcDF = sqlContext.read.jdbc(url = jdbcUrl, table = dbTableOrQuery, 
       columnName = columnToPartition, 
       lowerBound = lowerBound, 
       upperBound = highBound, 
       numPartitions = numOfPartiton, 
       connectionProperties = new java.util.Properties() 
       ) 

      jdbcDF 

    } 

のコードは最後に、私はこの

のように参加
val joinedData = rddDF.join(dbDF,rddDF("ID") === dbDF("ID") 
           && rddDF("CODE") === dbDF("CODE"),"left_outer") 
         .drop(dbDF("code")) 
         .drop(dbDF("id")) 
         .drop(dbDF("number")) 
         .drop(dbDF("key")) 
         .drop(dbDF("loaddate")) 
         .drop(dbDF("fid")) 
joinedData.show() 

をやっている私の入力DSTREAMは1000行とデータを持っていますが含まれています百万行だから、私はこの結合を行うと、データベースからすべての行を読み込み、それらの行を読み上げるか、これはちょうど入力DStreamからcode,idを持っているDBからそれらの特定の行を読み取るでしょう

+1

これは完全な表をロードします。ここには述語プッシュはありません。 – zero323

答えて

2

zero323で指定されているように、データがテーブルから完全に読み込まれることを確認しました。私はDBセッションログをチェックして、データセット全体がロードされているのを見ました。

ありがとうございますzero323

関連する問題