2017-10-13 6 views
2

Google App Engine Webserviceを実行せずにユーザーデータを処理するスクリプトを実行できますか?小さなスクリプトでwebserviceなしの大きな質問cronジョブ

それがうまく動作しますが、私のスクリプトは、最後のおよそ40分、私はエラーを取得していたとき:DeadlineExceededError

私の一時的な修正は、Pythonスクリプトを使用してWindows VMおよびコマンドラインでWindowsスケジューラを使用する

ました編集:コードが追加されました

jobs = [] 
jobs_status = [] 
jobs_error = [] 
# The project id whose datasets you'd like to list 
PROJECT_NUMBER = 'project' 
scope = ('https://www.googleapis.com/auth/bigquery', 
     'https://www.googleapis.com/auth/cloud-platform', 
     'https://www.googleapis.com/auth/drive', 
     'https://spreadsheets.google.com/feeds') 

credentials = ServiceAccountCredentials.from_json_keyfile_name('client_secrets.json', scope) 

# Create the bigquery api client 
service = googleapiclient.discovery.build('bigquery', 'v2', credentials=credentials) 

def load_logs(source): 
    body = {"rows": [ 
     {"json": source} 
    ]} 

    response = service.tabledata().insertAll(
     projectId=PROJECT_NUMBER, 
     datasetId='test', 
     tableId='test_log', 
     body=body).execute() 
    return response 

def job_status(): 
    for job in jobs: 
     _jobId = job['jobReference']['jobId'] 
     status = service.jobs().get(projectId=PROJECT_NUMBER, jobId=_jobId).execute() 
     jobs_status.append(status['status']['state']) 
     if 'errors' in status['status'].keys(): 
      query = str(status['configuration']['query']['query']) 
      message = str(status['status']['errorResult']['message']) 
      jobs_error.append({"query": query, "message": message}) 
    return jobs_status 


def check_statues(): 
    while True: 
     if all('DONE' in job for job in job_status()): 
      return 


def insert(query, tableid, disposition): 
    job_body = { 
    "configuration": { 
     "query": { 
     "query": query, 
     "useLegacySql": True, 
     "destinationTable": { 
     "datasetId": "test", 
     "projectId": "project", 
     "tableId": tableid 
     }, 
     "writeDisposition": disposition 
     } 
    } 
    } 

    r = service.jobs().insert(
     projectId=PROJECT_NUMBER, 
     body=job_body).execute() 
    jobs.append(r) 
    return r 



class MainPage(webapp2.RequestHandler): 
    def get(self): 
     query = "SELECT * FROM [gdocs_users.user_empty]" 
     insert(query, 'users_data_p1', "WRITE_TRUNCATE") 
     check_statues() 
     query = "SELECT * FROM [gdocs_users.user_empty]" 
     insert(query, 'users_data_p2', "WRITE_TRUNCATE") 
     query = "SELECT * FROM [gdocs_users.user_%s]" 
     for i in range(1, 1000): 
      if i <= 600: 
       insert(query % str(i).zfill(4), 'users_data_p1', "WRITE_APPEND") 
      else: 
       insert(query % str(i).zfill(4), 'user_data_p2', "WRITE_APPEND") 
     for error in jobs_error: 
      load_logs(error) 


app = webapp2.WSGIApplication([ 
    ('/', MainPage), 
], debug=True) 

答えて

2

デフォルトでは、App Engineサービスはautomatic scalingを使用します。これは、HTTP要求に対して60秒の制限があり、タスクキュー要求に対して10分の制限があります。基本スケーリングまたは手動スケーリングを使用するようにサービスを変更した場合、タスクキュー要求は最大24時間実行できます。

この作業には1つのインスタンスしか必要ない可能性があるので、デフォルトのサービスに加えて、おそらく2番目のserviceを作成しているようです。サブフォルダ内の1つのインスタンスの最大と基本的なスケーリングを使用して、次のapp.yaml設定でbqserviceフォルダを作成します。

# bqsservice/app.yaml 
# Possibly use a separate service for your BQ code than 
# the rest of your app: 
service: bqservice 
runtime: python27 
api_version: 1 
# Keep low memory/cost B1 class? 
instance_class: B1 
# Limit max services to 1 to keep costs down. There is an 
# 8 instance hour limit to the free tier. This option still 
# scales to 0 when not in use. 
basic_scaling: 
    max_instances: 1 

# Handlers: 
handlers: 
- url: /.* 
    script: main.app 

次に実行するためのスクリプトをスケジュールするために、同じサービスでcron.yamlを作成します。もし上記の私の例の構成は、その中に定義されたWSGIアプリでmain.pyファイルにあなたのBigQueryロジックを置くと:

# bqservice/main.py 
import webapp2 

class CronHandler(webapp2.RequestHandler): 

    def post(self): 
     # Handle your cron work 
     # .... 

app = webapp2.WSGIApplication([ 
    #('/', MainPage), # If you needed other handlers 
    ('/mycron', CronHandler), 
], debug=True) 

あなたが使う予定がない場合は、すべてのデフォルトのサービスにこれを仕事ができますApp Engineアプリで何か他のアプリデフォルトのサービスに加えてこれを行う場合は、静的なファイルを持つ単純なapp.yamlであっても、まずデフォルトのサービスに何かを展開する必要があります。

+0

ありがとうBrettJ!この解決策はうまくいくかもしれませんが、App EngineはBigQueryの復帰を待つ間に何の仕事もしていないことに注意してください。より良い方法は、ジョブがエンキューされるとすぐに戻ってから、後で戻ってジョブが完了したかどうかを確認することです。 –

+0

@BrettJ助けてくれてありがとう。私も追加したいと思っている人がいれば、 'cron.yaml'に' target:your_service'を追加して、使用したいサービスを判断する必要があります – Mat

0

ほとんどのBigQuery操作は非同期に実行できます。私たちにあなたのコードを教えてもらえますか? PythonのBigQueryのドキュメントから例えば

、:

def query(query): 
    client = bigquery.Client() 
    query_job = client.run_async_query(str(uuid.uuid4()), query) 

    query_job.begin() 
    query_job.result() # Wait for job to complete 

非同期の仕事だし、コードを完了するために、クエリを待つために選んでいます。待つ代わりに、begin()の後に仕事のIDを取得します。 Task Queueで後で実行するタスクをエンキューして、そのジョブの結果を確認できます。

+0

私はPython APIで動作していないフェデレーションされたテーブル(gsuitから)を持っているため、サービスを作成しなければなりませんでした。私はジョブを挿入して非同期にそれらを実行します。 – Mat

+0

ええ、あなたは 'check_statues () 'を返します。その後、ジョブが挿入されるとすぐに関数が戻ります。その後、そのジョブIDの結果を確認するために後で戻ってきてください。 –

+0

しかし私は以下のいくつかの質問があり、私は挿入が完了したときにそれらを実行しています。私が読んだところでは、タイムアウトを要求する60秒後にドキュメントのエラーが発生するので、アプリエンジンはそのような種類のスクリプトには適していますか、それとも小さなバックエンドのものですか? – Mat