2017-02-12 12 views
3

DirectRunnerでこのコードを実行すると問題なく動作します。 DataflowRunnerで、それはでクラッシュ:マニュアルの指示に従って、WriteToTextはDirectRunnerで動作しますが、DataflowRunnerのTypeErrorで失敗します

TypeError: process() takes exactly 4 arguments (3 given) [while running 'write_text/Write/WriteImpl/WriteBundles']`

私のapache-ビーム-SDKは、マスターからクローニングし、構築されています。それはapache-beam-sdk==0.6.0.dev0として構築されます。私はバージョンの疑いがある(しかし、私は思う)私は最近のバージョンの変更なしでコードの変更を見た(NewDoFnが消えたが、バージョンは変更されませんでした)。

問題の原因はわかりませんが、インストールされているsdkとdataflowコンテナに不一致があるようです。 DirectRunnerelementを直接DoFn.process()に渡し、DataflowRunnercontextに直接渡す別の不一致型エラーが発生します。

私は、できるだけ簡単なコードにこれを分離することを試みた:

import uuid 
import apache_beam.utils.pipeline_options 
import apache_beam as beam 

runner = 'DataflowRunner' 
# runner = 'DirectRunner' 

options = beam.utils.pipeline_options.PipelineOptions() 
gcloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions) 
gcloud_options.job_name = 'a' + str(uuid.uuid4()) 
gcloud_options.project = 'your-project' 
gcloud_options.staging_location = 'gs://your-bucket/beam/staging' 
gcloud_options.temp_location = 'gs://your-bucket/beam/temp' 
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = runner 

p = beam.Pipeline(options=options) 
(p 
| 'some_strings' >> beam.Create(tuple('asdfqwert')) 
| 'write_text' >> beam.io.WriteToText('strings', file_name_suffix='.txt') 
) 
p.run().wait_until_finish() 

全出力を:

No handlers could be found for logger "oauth2client.contrib.multistore_file" 
/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/coders/typecoders.py:136: UserWarning: Using fallback coder for typehint: Any. 
    warnings.warn('Using fallback coder for typehint: %r.' % typehint) 
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead. 
Collecting google-cloud-dataflow==0.5.1 
    Using cached google-cloud-dataflow-0.5.1.tar.gz 
    Saved /var/folders/v3/61xx4nnn6p36n5m9fp4qdwtr0000gn/T/tmpuCWoeh/google-cloud-dataflow-0.5.1.tar.gz 
Successfully downloaded google-cloud-dataflow 
Traceback (most recent call last): 
    File "reproduce_bug.py", line 28, in <module> 
    p.run().wait_until_finish() 
    File "/Users/john/miniconda3/envs/py2/lib/python2.7/site-packages/apache_beam/runners/dataflow_runner.py", line 706, in wait_until_finish 
    (self.state, getattr(self._runner, 'last_error_msg', None)), self) 
apache_beam.runners.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error: 
(70278eb56b40fd94): Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 514, in do_work 
    work_executor.execute() 
    File "dataflow_worker/executor.py", line 899, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:26452) 
    op.start() 
    File "dataflow_worker/executor.py", line 191, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7575) 
    def start(self): 
    File "dataflow_worker/executor.py", line 196, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7480) 
    with self.spec.source.reader() as reader: 
    File "dataflow_worker/executor.py", line 206, in dataflow_worker.executor.ReadOperation.start (dataflow_worker/executor.c:7425) 
    self.output(windowed_value) 
    File "dataflow_worker/executor.py", line 136, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:5749) 
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) 
    File "dataflow_worker/executor.py", line 83, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:3884) 
    cython.cast(Operation, consumer).process(windowed_value) 
    File "dataflow_worker/executor.py", line 505, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:15525) 
    self.dofn_receiver.receive(o) 
    File "apache_beam/runners/common.py", line 163, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:4862) 
    self.process(windowed_value) 
    File "apache_beam/runners/common.py", line 270, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7749) 
    self.reraise_augmented(exn) 
    File "apache_beam/runners/common.py", line 281, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:8108) 
    raise type(exn), args, sys.exc_info()[2] 
    File "apache_beam/runners/common.py", line 268, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7660) 
    self.old_dofn_process(element) 
    File "apache_beam/runners/common.py", line 173, in apache_beam.runners.common.DoFnRunner.old_dofn_process (apache_beam/runners/common.c:5182) 
    self._process_outputs(element, self.dofn_process(self.context)) 
    File "apache_beam/runners/common.py", line 152, in apache_beam.runners.common.DoFnRunner.__init__.lambda3 (apache_beam/runners/common.c:3640) 
    self.dofn_process = lambda context: fn.process(context, *args) 
TypeError: process() takes exactly 4 arguments (3 given) [while running 'write_text/Write/WriteImpl/WriteBundles'] 

答えて

1

あなたのスタックの先頭を参照してください(ご使用の環境にバージョン0.5.1がインストールされているようですトレース)、しかしあなたはpython repoのHEADでビルドしています。

正しいバージョンのSDKを持つ新しいvirtualenv環境を作成できます。

  • Python HEADに対して実行する場合は、パイプラインを実行するときにsdk_locationフラグを設定する必要があります。
  • リリースバージョンに対して実行する場合は、pip install google-cloud-dataflowを使用してSDKをインストールし、パイプラインを正常に実行します。 (好ましくは仮想環境でvirtualenvを使用して)

注釈は、リリースされたバージョンを使用する場合にはおそらく最適です。

+0

これは意味があります。私はどこに/私は違いを得るためにセットアップに間違っていたのだろうか。私は今日それを試してみる。 – KobeJohn

+0

'virtualenv venvをうまくやっているはずです。ソースvenv/bin/activate; pip install apache_beam' – Pablo

+0

私は 'apache_beam'というパッケージやpypiと似たパッケージを見ません。私はそれを見逃しましたか? 'pip install google-cloud-dataflow'の結果は' google-cloud-dataflow-0.5.5'となります。あなたの説明によれば、デフォルトのパイプラインバージョンと一致しないビーム0.5.5を使用していることが示唆されます。 Apacheのビームレポジトリ自体のインストール手順は、HEADという結果になるレポを複製するように言います。私は何かを見逃しているのですか、またはドキュメント+レポ+ピピのすべてがバージョン管理と同期していませんか? P.S.コンドミアライフ4! – KobeJohn

関連する問題