0

Google Dataflow(apache-beam)を使用して小さなテストを設定しました。実験のユースケースは、(csv)ファイルを取り、選択した列を(txt)ファイルに書き込むことです。以下のサンプル1のGoogle Dataflowが1000番目のレコードを削除しているようです

from __future__ import absolute_import 

import argparse 
import logging 
import re 

import apache_beam as beam 
from apache_beam.io import ReadFromText 
from apache_beam.io import WriteToText 
from apache_beam.metrics import Metrics 
from apache_beam.metrics.metric import MetricsFilter 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import SetupOptions 

class EmitColDoFn(beam.DoFn): 
    first = True 
    header = "" 
    def __init__(self, i): 
     super(EmitColDoFn, self).__init__() 
     self.line_count = Metrics.counter(self.__class__, 'lines') 
     self.i = i 

    def process(self, element): 
     if self.first: 
      self.header = element 
      self.first = False 
     else: 
      self.line_count.inc() 
      cols = re.split(',', element) 
      return (cols[self.i],) 

def run(argv=None): 
    """Main entry point; defines and runs the wordcount pipeline.""" 
    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', 
         dest='input', 
         default='/users/sms/python_beam/data/MOCK_DATA (4).csv', 
#      default='gs://dataflow-samples/shakespeare/kinglear.txt', 
         help='Input file to process.') 
    parser.add_argument('--output', 
         dest='output', 
         default="https://stackoverflow.com/users/sms/python_beam/data/", 
#      required=True, 
         help='Output file to write results to.') 
    known_args, pipeline_args = parser.parse_known_args(argv) 

    pipeline_options = PipelineOptions(pipeline_args) 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    p = beam.Pipeline(options=pipeline_options) 

    # Read the text file[pattern] into a PCollection. 
    lines = p | 'read' >> ReadFromText(known_args.input) 

    column = (lines 
      | 'email col' >> (beam.ParDo(EmitColDoFn(3))) 
      | "col file" >> WriteToText(known_args.output, ".txt", shard_name_template="SS_Col")) 

    result = p.run() 
    result.wait_until_finish() 

    if (not hasattr(result, 'has_job') # direct runner 
     or result.has_job): # not just a template creation 
     lines_filter = MetricsFilter().with_name('lines') 
     query_result = result.metrics().query(lines_filter) 
     if query_result['counters']: 
      lines_counter = query_result['counters'][0] 

     print "Lines committed", lines_counter.committed 
run() 

最後の数行:

実験のためのコードを以下に示すよう

/usr/local/bin/python2.7 
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py 
No handlers could be found for logger "oauth2client.contrib.multistore_file" 
Lines committed 996 

Process finished with exit code 0 

990,Corabel,Feldbau,[email protected],Female,84.102.162.190,DJ 
991,Kiley,Rottcher,[email protected],Male,91.97.155.28,CA 
992,Glenda,Clist,[email protected],Female,24.98.253.127,UA 
993,Ingunna,Maher,[email protected],Female,159.31.127.19,PL 
994,Megan,Giacopetti,[email protected],Female,115.6.63.52,RU 
995,Briny,Dutnall,[email protected],Female,102.81.33.24,SE 
996,Jan,Caddan,[email protected],Female,115.142.222.106,PL 

これはの期待出力を生成する実行

今、奇妙な結果があります。次の実行では、行数は1000

994,Megan,Giacopetti,[email protected],Female,115.6.63.52,RU 
995,Briny,Dutnall,[email protected],Female,102.81.33.24,SE 
996,Jan,Caddan,[email protected],Female,115.142.222.106,PL 
997,Shannen,Gaisford,[email protected],Female,167.255.222.92,RU 
998,Lorianna,Slyne,[email protected],Female,54.169.60.13,CN 
999,Franklin,Yaakov,[email protected],Male,122.1.92.236,CN 
1000,Wilhelmine,Cariss,[email protected],Female,237.48.113.255,PL 

に増加している。しかし、この時間は、アウトプットは、出力ファイルの

/usr/local/bin/python2.7 
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py 
No handlers could be found for logger "oauth2client.contrib.multistore_file" 
Lines committed 999 

Process finished with exit code 0 

検査は、最後の行が処理されなかったことを示しています。

[email protected] 
[email protected] 
[email protected] 
[email protected] 
[email protected] 

何が起こっているのですか?

答えて

5

「EditColDoFn」は、ファイルごとに1つのインスタンスがあると仮定して、最初の行をスキップします。 1000行以上の場合、DirectRunnerは2つのバンドルを作成します.1つ目は1000行、もう1つは1行です。 Beamアプリケーションでは、入力を複数のバンドルに分割して並列処理することができます。ファイル数とバンドル数には相関がありません。同じアプリケーションでは、複数のファイルにまたがってテラバイトのデータを処理できます。

ReadFromTextには 'skip_header_lines'というオプションがあります。このオプションを使用すると、各入力ファイルのヘッダー行をスキップできます。

関連する問題