私はdataproc-bigQueryコネクタを使用してパーティション化されたテーブルを読み込み、300GB以上のデータとそのパーティションを日付で表示していますが、 sparkコネクタで読んで、私はすでにパーティション化されたbigqueryのビューでそれを読んでみましたが、うまくいきません、apache sparkでbigqueryテーブルからパーティションを読み込む方法はありますか?apache sparkでパーティションテーブルまたはビューを読み取る
更新(現在のコードスニペットを持つ):
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
import com.google.gson.JsonObject
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.DoubleWritable
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
@transient
val conf = sc.hadoopConfiguration
//path to the view
val fullyQualifiedInputTableId = "XXXX"
val projectId conf.get("fs.gs.project.id")
val bucket = conf.get("fs.gs.system.bucket")
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
BigQueryConfiguration.configureBigQueryInput(conf,
fullyQualifiedInputTableId)
val outputTableId = projectId + ":sparkBigQuery.classifiedQueries"
val outputGcsPath = ("gs://" +bucket+"/hadoop/tmp/bigquery/wordcountoutput")
BigQueryOutputConfiguration.configure(conf,outputTableId,null,outputGcsPath,BigQueryFileFormat.NEWLINE_DELIMITED_JSON,classOf[TextOutputFormat[_,_]])
conf.set("mapreduce.job.outputformat.class",classOf[IndirectBigQueryOutputFormat[_,_]].getName)
conf.set(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,"WRITE_TRUNCATE")
def convertToTuple(record: JsonObject) : (String, String,Double) = {
val user = record.get("user").getAsString
val query = record.get("query").getAsString.toLowerCase
val classifiedQuery= nb.predict(tf.transform(query.split(" ")))
return (user, query,classifiedQuery)
}
// Load data from BigQuery.
val tableData = sc.newAPIHadoopRDD(
conf,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject])
tableData.map(entry=>convertToReadbale(entry._2)).first()
val classifiedRDD = tableData.map(entry => convertToTuple(entry._2))
classifiedRDD.take(10).foreach(l => println(l))
使用したコードを貼り付けることはできますか? –
spark bigQueryチュートリアルの標準コード – Mootaz
@FelipeHoffa samelamin/spark-bigqueryとspotify/spark-bigqueryの2つの類似プロジェクトが見つかりました。問題は解決されましたが、私はまだ正式な方法があるかどうか疑問に思っていますこれをアドレスしてください。 – Mootaz