2015-12-10 19 views

私はPandas library to read BigQueryデータを使いたいです。大きな結果を許可するにはどうすればよいですか?
パンダ以外のBigQueryインタラクションでは、これはthisのように実現できます。Python BigQuery allowLargeResults with pandas.io.gbq


sProjectID = "project-id" 
sQuery = ''' 
     column1, column2 
    FROM [dataset_name.tablename] 
from pandas.io import gbq 
df = gbq.read_gbq(sQuery, sProjectID) 




大きな結果は、BigQuery - > Storage - > local - > dataframeパターンを実際に実行する必要があります。





pip install pandas 
pip install google-cloud-storage 
pip install google-cloud-bigquery 


We require python 3 for the google cloud python API 
    mkvirtualenv --python `which python3` env3 
And our dependencies: 
    pip install pandas 
    pip install google-cloud-bigquery 
    pip install google-cloud-storage 
import os 
import time 
import uuid 

from google.cloud import bigquery 
from google.cloud import storage 
import pandas as pd 

def bq_to_df(project_id, dataset_id, table_id, storage_uri, local_data_path): 
    """Pipeline to get data from BigQuery into a local pandas dataframe. 

    :param project_id: Google project ID we are working in. 
    :type project_id: str 
    :param dataset_id: BigQuery dataset id. 
    :type dataset_id: str 
    :param table_id: BigQuery table id. 
    :type table_id: str 
    :param storage_uri: Google Storage uri where data gets dropped off. 
    :type storage_uri: str 
    :param local_data_path: Path where data should end up. 
    :type local_data_path: str 
    :return: Pandas dataframe from BigQuery table. 
    :rtype: pd.DataFrame 
    bq_to_storage(project_id, dataset_id, table_id, storage_uri) 

    storage_to_local(project_id, storage_uri, local_data_path) 

    data_dir = os.path.join(local_data_path, "test_data") 
    df = local_to_df(data_dir) 

    return df 

def bq_to_storage(project_id, dataset_id, table_id, target_uri): 
    """Export a BigQuery table to Google Storage. 

    :param project_id: Google project ID we are working in. 
    :type project_id: str 
    :param dataset_id: BigQuery dataset name where source data resides. 
    :type dataset_id: str 
    :param table_id: BigQuery table name where source data resides. 
    :type table_id: str 
    :param target_uri: Google Storage location where table gets saved. 
    :type target_uri: str 
    :return: The random ID generated to identify the job. 
    :rtype: str 
    client = bigquery.Client(project=project_id) 

    dataset = client.dataset(dataset_name=dataset_id) 
    table = dataset.table(name=table_id) 

    job = client.extract_table_to_storage(
     str(uuid.uuid4()), # id we assign to be the job name 
    job.destination_format = 'CSV' 
    job.write_disposition = 'WRITE_TRUNCATE' 

    job.begin() # async execution 

    if job.errors: 

    while job.state != 'DONE': 
     print("exporting '{}.{}' to '{}': {}".format(
      dataset_id, table_id, target_uri, job.state 


    return job.name 

def storage_to_local(project_id, source_uri, target_dir): 
    """Save a file or folder from google storage to a local directory. 

    :param project_id: Google project ID we are working in. 
    :type project_id: str 
    :param source_uri: Google Storage location where file comes form. 
    :type source_uri: str 
    :param target_dir: Local file location where files are to be stored. 
    :type target_dir: str 
    :return: None 
    :rtype: None 
    client = storage.Client(project=project_id) 

    bucket_name = source_uri.split("gs://")[1].split("/")[0] 
    file_path = "/".join(source_uri.split("gs://")[1].split("/")[1::]) 
    bucket = client.lookup_bucket(bucket_name) 

    folder_name = "/".join(file_path.split("/")[0:-1]) + "/" 
    blobs = [o for o in bucket.list_blobs() if o.name.startswith(folder_name)] 

    # get files if we wanted just files 
    blob_name = file_path.split("/")[-1] 
    if blob_name != "*": 
     print("Getting just the file '{}'".format(file_path)) 
     our_blobs = [o for o in blobs if o.name.endswith(blob_name)] 
     print("Getting all files in '{}'".format(folder_name)) 
     our_blobs = blobs 

    print([o.name for o in our_blobs]) 

    for blob in our_blobs: 
     filename = os.path.join(target_dir, blob.name) 

     # create a complex folder structure if necessary 
     if not os.path.isdir(os.path.dirname(filename)): 

     with open(filename, 'wb') as f: 

def local_to_df(data_path): 
    """Import local data files into a single pandas dataframe. 

    :param data_path: File or folder path where csv data are located. 
    :type data_path: str 
    :return: Pandas dataframe containing data from data_path. 
    :rtype: pd.DataFrame 
    # if data_dir is a file, then just load it into pandas 
    if os.path.isfile(data_path): 
     print("Loading '{}' into a dataframe".format(data_path)) 
     df = pd.read_csv(data_path, header=1) 
    elif os.path.isdir(data_path): 
     files = [os.path.join(data_path, fi) for fi in os.listdir(data_path)] 
     print("Loading {} into a single dataframe".format(files)) 
     df = pd.concat((pd.read_csv(s) for s in files)) 
     raise ValueError(
      "Please enter a valid path. {} does not exist.".format(data_path) 

    return df 

if __name__ == '__main__': 
    PROJECT_ID = "my-project" 
    DATASET_ID = "bq_dataset" 
    TABLE_ID = "bq_table" 
    STORAGE_URI = "gs://my-bucket/path/for/dropoff/*" 
    LOCAL_DATA_PATH = "/path/to/save/" 


これはpython 2.7ではできませんか? – Keith


Pythonで可能です2.7 – Flair


@Flair Apologies。私はそれがPython 3のみであるという印象の下で走ってきましたが、私はPython 2 virtualenvでそれをテストしました。それはうまく動作します。 – Roman


編集:私は私の他の回答ででこれを行うための適切な方法を掲載しました。 Googleのストレージに最初にデータを落としてください。この方法では、データが大きすぎることはありません。

[OK]を、私はパンダでそれを行うための直接的な方法を見つけられませんでしたので、私は通常のAPIで少し余分を書かなければなりませんでした。 のpython3 google.cloud API経由でこれを行うための適切な方法を投稿することを決めた

sProjectID = "project-id" 
sQuery = ''' 
     column1, column2 
    FROM [dataset_name.tablename] 

df = create_dataframe(sQuery, sProjectID, bLargeResults=True) 

#*******Functions to make above work********* 

def create_dataframe(sQuery, sProjectID, bLargeResults=False): 
    "takes a BigQuery sql query and returns a Pandas dataframe" 

    if bLargeResults: 
     oService = create_service() 
     dDestinationTable = run_query(sQuery, oService, sProjectID) 
     df = pandas_get_table(dDestinationTable) 
     df = pandas_query(sQuery, sProjectID) 

    return df 

def pandas_query(sQuery, sProjectID): 
    "go into bigquery and get the table with sql query and return dataframe" 
    from pandas.io import gbq 
    df = gbq.read_gbq(sQuery, sProjectID) 

    return df 

def pandas_get_table(dTable): 
    "fetch a table and return dataframe" 
    from pandas.io import gbq 

    sProjectID = dTable['projectId'] 
    sDatasetID = dTable['datasetId'] 
    sTableID = dTable['tableId'] 
    sQuery = "SELECT * FROM [{}.{}]".format(sDatasetID, sTableID) 

    df = gbq.read_gbq(sQuery, sProjectID) 

    return df 

def create_service(): 
    "create google service" 
    from oauth2client.client import GoogleCredentials 
    from apiclient.discovery import build 
    credentials = GoogleCredentials.get_application_default() 
    oService = build('bigquery', 'v2', credentials=credentials) 
    return oService 

def run_query(sQuery, oService, sProjectID): 
    "runs the bigquery query" 

    dQuery = { 
     'configuration': { 
      'query': { 
       'writeDisposition': 'OVERWRITE', 
       'useQueryCache': False, 
       'allowLargeResults': True, 
       'query': sQuery, 
       'destinationTable': { 
        'projectId': sProjectID, 
        'datasetId': 'sandbox', 
        'tableId': 'api_large_result_dropoff', 

    job = oService.jobs().insert(projectId=sProjectID, body=dQuery).execute() 

    return job['configuration']['query']['destinationTable'] 

これは有望です。しかし、私はエラーが発生する理由:notFound、メッセージ:見つからない:テーブルmy_project_id:sandbox.api_large_result_dropoff。このコードを実行する前に、前もって措置を講じる必要がありますか? – marcopah


BigQueryプロジェクト(これは 'my_project_id'と呼ばれるようです)とそのプロジェクトのデータセット(上記の' sandbox'と呼ばれます)が必要です。 – Roman


中間テーブルに保存した元のクエリの結果が依然として大きい場合、これは機能しません。 –



pd.read_gbq(query, 'my-super-project', dialect='standard') 


AllowLargeResults:標準のSQLクエリの場合、このフラグは 無視され、大きな結果が常に許可されています。


特定のサイズのしきい値を超える宛先テーブルを指定する必要があります。 –