2016-04-21 8 views
0

テーブルをBQからPySpark RDDにダウンロードします。もう一度アップロードするには?PySpark RDDをBigQueryにアップロード

dGSConfig = { 
    'project_id': "project_id", 
    'bucket': "bucket_id" 
} 
dBQConfig = { 
    'gs_config': dGSConfig, 
    'project_id': "project_id", 
    'dataset_id': "dataset_id", 
    'table_id': "table_id" 
} 

oSc = instantiate_pyspark() 
rddData, lsHeadings = get_table_cloud(oSc, dBQConfig) #rddData has a list-of-lists type format 




def instantiate_pyspark(): 
    """instantiate the pyspark RDD stuff""" 
    import pyspark 

    oSc = pyspark.SparkContext() 
    oHadoopConf = oSc._jsc.hadoopConfiguration() 
    oHadoopConf.get("fs.gs.system.bucket") 

    return oSc 


def get_table_cloud(oSc, dBQConfig): 
    """get a table from bigquery via google cloud storage 
    Config format: 
     dGSConfig = {'project_id': '', 'bucket': ''} 
     dBQConfig = {'project_id: '', 'dataset_id': '', 'table_id': ''} 
    """ 
    dGSConfig = dBQConfig['gs_config'] 

    dConf = { 
     "mapred.bq.project.id": dGSConfig['project_id'], 
     "mapred.bq.gcs.bucket": dGSConfig['bucket'], 
     "mapred.bq.input.project.id": dBQConfig['project_id'], 
     "mapred.bq.input.dataset.id":dBQConfig['dataset_id'], 
     "mapred.bq.input.table.id": dBQConfig['table_id'] 
    } 

    rddDatasetRaw = oSc.newAPIHadoopRDD(
     "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", 
     "org.apache.hadoop.io.LongWritable", 
     "com.google.gson.JsonObject", 
     conf=dConf 
    ) 

    import json 
    lsHeadings = json.loads(rddDatasetRaw.take(1)[0][1]).keys() 

    rddDataset = (
     rddDatasetRaw 
     .map(lambda t, json=json: json.loads(t[1]).values()) 
    ) 

    return rddDataset, lsHeadings 

答えて

0

3方法:

1)にBigQueryに入るために、Googleのストレージにアップロードし、別のプロセスをローカルCSVを作成します。

llData = rddData.collect() 


with open(sCsvPath, 'w') as f: 
    import csv 
    oWriter = csv.writer(f) 
    for lData in llData: 
     oWriter.writerow(lData) 

import subprocess 
lsCommand = ['gsutil', 'cp', sCsvPath, sGooglePath] 
subprocess.check_output(lsCommand) 

2)パンダを使用してBigQueryに直接アップロードする:

3)ストレートストレージ使用pysparkに配布結果を保存します。これらのどれもが、私はもともと欲しかったものではない、それはBQにまっすぐに結果をアップロードするPySparkの方法ですが

#remove previous dir if exists 
import subprocess 
lsCommand = ['gsutil', 'rm', '-r', sGooglePath] 
subprocess.check_output(lsCommand) 

rddSave.saveAsTextFile(sGooglePath) 

1

いくつかの中間ファイルにエクスポートし、それらのファイルをBigQueryに読み込むことができます。

これは役立つかもしれない:私はいくつかの時点で使用how to export a table dataframe in pyspark to csv?

関連する問題