2017-10-03 10 views
0

私はdataproc-bigQueryコネクタを使用してパーティション化されたテーブルを読み込み、300GB以上のデータとそのパーティションを日付で表示していますが、 sparkコネクタで読んで、私はすでにパーティション化されたbigqueryのビューでそれを読んでみましたが、うまくいきません、apache sparkでbigqueryテーブルからパーティションを読み込む方法はありますか?apache sparkでパーティションテーブルまたはビューを読み取る

更新(現在のコードスニペットを持つ):

import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration 
    import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat 
    import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat 
    import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration 
    import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat 
    import com.google.gson.JsonObject 
    import org.apache.hadoop.io.LongWritable 
    import org.apache.hadoop.io.DoubleWritable 
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 
    import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel} 
    import org.apache.spark.mllib.util.MLUtils 
    import org.apache.spark.mllib.feature.{HashingTF, IDF} 
    import org.apache.spark.mllib.linalg.Vector 
    import org.apache.spark.rdd.RDD  


     @transient 
      val conf = sc.hadoopConfiguration 
      //path to the view 
      val fullyQualifiedInputTableId = "XXXX" 
      val projectId conf.get("fs.gs.project.id") 
      val bucket = conf.get("fs.gs.system.bucket") 
      conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)  
      conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)  
      BigQueryConfiguration.configureBigQueryInput(conf, 
      fullyQualifiedInputTableId)  
      val outputTableId = projectId + ":sparkBigQuery.classifiedQueries" 
      val outputGcsPath = ("gs://" +bucket+"/hadoop/tmp/bigquery/wordcountoutput")  
      BigQueryOutputConfiguration.configure(conf,outputTableId,null,outputGcsPath,BigQueryFileFormat.NEWLINE_DELIMITED_JSON,classOf[TextOutputFormat[_,_]])  
      conf.set("mapreduce.job.outputformat.class",classOf[IndirectBigQueryOutputFormat[_,_]].getName)   
      conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,"WRITE_TRUNCATE")  
      def convertToTuple(record: JsonObject) : (String, String,Double) = { 
       val user = record.get("user").getAsString 
       val query = record.get("query").getAsString.toLowerCase 
       val classifiedQuery= nb.predict(tf.transform(query.split(" ")))  
       return (user, query,classifiedQuery)  
      } 
     // Load data from BigQuery. 
     val tableData = sc.newAPIHadoopRDD(
      conf, 
      classOf[GsonBigQueryInputFormat], 
      classOf[LongWritable], 
      classOf[JsonObject]) 
tableData.map(entry=>convertToReadbale(entry._2)).first() 

val classifiedRDD = tableData.map(entry => convertToTuple(entry._2)) 

     classifiedRDD.take(10).foreach(l => println(l)) 
+0

使用したコードを貼り付けることはできますか? –

+0

spark bigQueryチュートリアルの標準コード – Mootaz

+0

@FelipeHoffa samelamin/spark-bigqueryとspotify/spark-bigqueryの2つの類似プロジェクトが見つかりました。問題は解決されましたが、私はまだ正式な方法があるかどうか疑問に思っていますこれをアドレスしてください。 – Mootaz

答えて

0

パーティションデコレータ( "$")を使用hereを文書化し、それがHadoop connector does support the "$" in the table name文字列のように見えます。

+0

これは質問に対する答えを提供しません。批評をしたり、著者の説明を求めるには、投稿の下にコメントを残してください。 - [レビューから](/レビュー/低品質の投稿/ 17575808) – clemens

関連する問題