2016-09-24 7 views
3

beam.io.Write(beam.io.BigQuerySink(..))操作をParDo関数内から呼び出して、PCollection(私はPython SDKを使用しています)の各キーに別々のBigQueryテーブルを生成したいと思います。ここでは、残念ながら助けにはならなかった2つの類似のスレッド、以下のとおりです。ParDo関数内からBigQueryへの書き込み

1)https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

2)Dynamic table name when writing to BQ from dataflow pipelines

私は、次のコード、の行を実行すると、最初のキーは、その後のBigQueryとに挿入されますパイプラインは以下のエラーで失敗します。私が間違ってやっていることや、それを修正する方法の提案について本当に感謝しています。

パイプラインコード:

rows = p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(query=query)) 

class par_upload(beam.DoFn): 

    def process(self, context): 
     key, value = context.element 

     ### This block causes issues ### 
     value | 'write_to_bq' >> beam.io.Write(
         beam.io.BigQuerySink(
          'PROJECT-NAME:analytics.first_table', #will be replace by a dynamic name based on key 
          schema=schema, 
          write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED 
          ) 
      ) 
     ### End block ###### 
     return [value] 


### Following part works fine ### 
filtered = (rows | 'filter_rows' >> beam.Filter(lambda row: row['topic'] == 'analytics') 
        | 'apply_projection' >> beam.Map(apply_projection, projection_fields) 
        | 'group_by_key' >> beam.GroupByKey() 
        | 'par_upload_to_bigquery' >> beam.ParDo(par_upload()) 
        | 'flat_map' >> beam.FlatMap(lambda l: l) #this step is just for testing 
       ) 

### This part works fine if I comment out the 'write_to_bq' block above 
filtered | 'write_to_bq' >> beam.io.Write(
     beam.io.BigQuerySink(
      'PROJECT-NAME:analytics.another_table', 
      schema=schema, 
      write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, 
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) 
     ) 

エラーメッセージ:

INFO:oauth2client.client:Attempting refresh to obtain initial access_token 
INFO:oauth2client.client:Attempting refresh to obtain initial access_token 
INFO:root:Writing 1 rows to PROJECT-NAME:analytics.first_table table. 
INFO:root:Final: Debug counters: {'element_counts': Counter({'CreatePInput0': 1, 'write_to_bq/native_write': 1})} 
ERROR:root:Error while visiting par_upload_to_bigquery 
Traceback (most recent call last): 
    File "split_events.py", line 137, in <module> 
    run() 
    File "split_events.py", line 132, in run 
    p.run() 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run 
    return self.runner.run(self) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 102, in run 
    super(DirectPipelineRunner, self).run(pipeline) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run 
    pipeline.visit(RunVisitor(self)) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit 
    self._root_transform().visit(visitor, self, visited) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit 
    part.visit(visitor, pipeline, visited) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit 
    visitor.visit_transform(self) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform 
    self.runner.run_transform(transform_node) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform 
    return m(transform_node) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 98, in func_wrapper 
    func(self, pvalue, *args, **kwargs) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 180, in run_ParDo 
    runner.process(v) 
    File "apache_beam/runners/common.py", line 133, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4483) 
    File "apache_beam/runners/common.py", line 139, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4311) 
    File "apache_beam/runners/common.py", line 150, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:4677) 
    File "apache_beam/runners/common.py", line 137, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4245) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 149, in process 
    return self.run(self.dofn.process, context, args, kwargs) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 134, in run 
    result = method(context, *args, **kwargs) 
    File "split_events.py", line 73, in process 
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 724, in __ror__ 
    return self.transform.__ror__(pvalueish, self.label) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 445, in __ror__ 
    return _MaterializePValues(cache).visit(result) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 105, in visit 
    return self._pvalue_cache.get_unwindowed_pvalue(node) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 262, in get_unwindowed_pvalue 
    return [v.value for v in self.get_pvalue(pvalue)] 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 244, in get_pvalue 
    value_with_refcount = self._cache[self.key(pvalue)] 
KeyError: "(4384177040, None) [while running 'par_upload_to_bigquery']" 

編集(最初の答えの後):

私は私の値を実現していなかったである必要があります。

私は(おそらく非常に非効率的である)今これに私のコードを変更しました:

今ローカル罰金 に動作しますが、 BlockingDataflowPipelineRunner

:-(パイプラインがで失敗しない

key_pipe = p | 'pipe_' + key >> beam.Create(value) 
key_pipe | 'write_' + key >> beam.io.Write(beam.io.BigQuerySink(..)) 

次のエラー:

JOB_MESSAGE_ERROR: (979394c29490e588): Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 474, in do_work 
    work_executor.execute() 
    File "dataflow_worker/executor.py", line 901, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:24331) 
    op.start() 
    File "dataflow_worker/executor.py", line 465, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:14193) 
    def start(self): 
    File "dataflow_worker/executor.py", line 469, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:13499) 
    fn, args, kwargs, tags_and_types, window_fn = (
ValueError: too many values to unpack (expected 5) 
+0

私は最初の答えは正しいと思います。パイプラインの一部として実行されている 'DoFn'の中から、さらに多くのステップをパイプラインに追加することはできません。これがDirectRunnerで動作するという事実はバグです。この種のデータ依存型の書き込みを行う場合は、他のスレッドが示唆するように、今のところBigQuerySinkを使用するのではなく、BigQuery APIと直接対話する必要があります。 –

答えて

0

BigQueryの書き込み操作を行う唯一の方法ParDoでは、BigQuery APIを直接使用するか、clientを使用しました。

あなたが書いたコードは、Dataflow ParDo classbeam.io.BigQuerySink()をDoFn機能に入れています。 ParDoクラスは、作業コードの例でfilteredのようなPCollectionを処理することを想定しています。 valueで動作する機能しないコードの場合はそうではありません。

私は最も簡単な選択肢は、gcloud-python BigQuery関数insert_data()を見て、これをあなたのParDoの中に入れることです。

関連する問題