2017-01-26 11 views
0

Google bigqueryの日付パーティションに大量の履歴データを整理する必要があります。それはあなたのための負荷日付を分割します(現在の日付のみ)が、それは本当に履歴データを助けません。私が今見た唯一の解決策は、Googleがツールをもう少し構築するまで日付フラグを使用して日付ごとに手動で行うことです。どんな解決策ですか?Google Cloud Storageの履歴データをPythonを使用して日付分割されたbigqueryテーブルに移動する

+2

実際の質問で分割して質問に投稿して投稿することをお勧めします(既に素晴らしい回答があるようです)。質問の一部ではない)。 [How to Ask](http://stackoverflow.com/help/how-to-ask)も読むことができます。 –

+0

これは役に立つポストのように思えると思います。自己回答型の質問の例として、http://stackoverflow.com/questions/41638651/how-do-i-update-a-nested-record-in-bigquery-using-dml-syntaxを参照してください。 –

+0

提案に感謝します:) – Ralph

答えて

0

私は自分のパイプラインを作成し、それを以下に含めました。それを実行するには、この投稿のすべてのコードブロックをまとめてください。

import datetime 
from subprocess import call 

start = datetime.date(year = 2016, month = 10, day = 24) 
#end = datetime.date(year = 2016, month = 10, day = 01) 
end = datetime.date.today() 
file_type = ['activity', 'click', 'impression', 'rich_media']  
dataset = 'dataset_name' 

ベロースクリプトは、1つのGCSバケットから別のGCSバケットにファイルをコピーします。グーグルダブルクリックファイルは、ファイル名に作成日を持って、このスクリプトは、作成日は、ファイルを配置するためにどのような日付パーティションを決定することを使用しています。

#pull all the files into our own buckets on gcs so that we dont lose data after 3 months 
call("gsutil -m cp -r gs://bucket/path/to/mysource* gs://mybucket/path/to/mydata, shell=True) 

これはすぐに過去のデータを配布するための最良の方法のように思えました。個々のファイルを開き、各行を適切なパーティションに配置したいのですが、どのようにすればよいかわかりません。

#create list of dates based on stat and end date supplied by user at begining of script 
def daterange(start_date, end_date): 
    if start_date <= end_date: 
     for n in range((end_date - start_date).days + 1): 
      yield start_date + datetime.timedelta(n) 
    else: 
     for n in range((start_date - end_date).days + 1): 
      yield start_date - datetime.timedelta(n) 

私は一番下にエラー処理を除いて/しようと追加が、私は、テーブル名は、それがジョブを作成します台無しにされている場合、呼び出し中にエラーが発生することはありませんので、それは実際には何もないとは思いませんサーバー側のエラーですが、実際にプロセスを停止したり、何かを傷つけることはありません。

--nosyncフラグを使用すると、もともと私はpopenを使用していた非同期ジョブを呼び出すことができますが、popenはそれ以降にクリーンアップするとは思いません。

#creates a bunch of jobs to upload GCS files into GBQ partitioned tables 
for bucket in file_type: 
    for date in daterange(start, end): 
     date = str(date).replace('-','') 
     source = 'gs://mybucket/path/to/mydata' + '*' 
     table = bucket + '$' + date 
     try: 
      process = call("bq --nosync load --skip_leading_rows=1 --replace dev."\ 
          + table + ' ' + source, shell=True) 
     except: 
      print 'missing ' + bucket + ' data for ' + date 
関連する問題