2016-06-30 14 views
0

this postと似たような問題を解決しようとしています。私の元のデータは、いくつかのセンサーの値(観測値)を含むテキストファイルです。各観測はタイムスタンプで与えられますが、センサ名は各行ではなく1回だけ与えられます。しかし、1つのファイルに複数のセンサーがあります。PySpark:newAPIHadoopFileを使用して複数行レコードのテキストファイルを読み込み、マップしてから減らす

Time MHist::852-YF-007 
2016-05-10 00:00:00 0 
2016-05-09 23:59:00 0 
2016-05-09 23:58:00 0 
2016-05-09 23:57:00 0 
2016-05-09 23:56:00 0 
2016-05-09 23:55:00 0 
2016-05-09 23:54:00 0 
2016-05-09 23:53:00 0 
2016-05-09 23:52:00 0 
2016-05-09 23:51:00 0 
2016-05-09 23:50:00 0 
2016-05-09 23:49:00 0 
2016-05-09 23:48:00 0 
2016-05-09 23:47:00 0 
2016-05-09 23:46:00 0 
2016-05-09 23:45:00 0 
2016-05-09 23:44:00 0 
2016-05-09 23:43:00 0 
2016-05-09 23:42:00 0 
Time MHist::852-YF-008 
2016-05-10 00:00:00 0 
2016-05-09 23:59:00 0 
2016-05-09 23:58:00 0 
2016-05-09 23:57:00 0 
2016-05-09 23:56:00 0 
2016-05-09 23:55:00 0 
2016-05-09 23:54:00 0 
2016-05-09 23:53:00 0 
2016-05-09 23:52:00 0 
2016-05-09 23:51:00 0 
2016-05-09 23:50:00 0 
2016-05-09 23:49:00 0 
2016-05-09 23:48:00 0 
2016-05-09 23:47:00 0 
2016-05-09 23:46:00 0 
2016-05-09 23:45:00 0 
2016-05-09 23:44:00 0 
2016-05-09 23:43:00 0 
2016-05-09 23:42:00 0 

したがって、センサ情報が与えられた行でファイルを分割するようにHadoopを設定します。これらの行からセンサー名(852-YF-007、852-YF-008など)を読み取り、MapReduceを使用して各センサーの値を読み取ります。

は、私は(Jupyterノート)Pythonでこれをやった:

sheet = sc.newAPIHadoopFile(
    '/user/me/sample.txt', 
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat', 
    'org.apache.hadoop.io.LongWritable', 
    'org.apache.hadoop.io.Text', 
    conf={'textinputformat.record.delimiter': 'Time\tMHist'} 
) 

sf = sheet.filter(lambda (k, v): v) 
sf.map(lambda (k, v): v).splitlines()) 

sf.take(50) 

出力は次のようなものです:

[[u'::852-YF-007\t', 
    u'2016-05-10 00:00:00\t0', 
    u'2016-05-09 23:59:00\t0', 
    u'2016-05-09 23:58:00\t0', 
    u'2016-05-09 23:57:00\t0', 
    u'2016-05-09 23:56:00\t0', 
    u'2016-05-09 23:55:00\t0', 
    u'2016-05-09 23:54:00\t0', 
    u'2016-05-09 23:53:00\t0', 
    u'2016-05-09 23:52:00\t0', 
    u'2016-05-09 23:51:00\t0', 
    u'2016-05-09 23:50:00\t0', 
    u'2016-05-09 23:49:00\t0', 
    u'2016-05-09 23:48:00\t0', 
    u'2016-05-09 23:47:00\t0', 
    u'2016-05-09 23:46:00\t0', 
    u'2016-05-09 23:45:00\t0', 
    u'2016-05-09 23:44:00\t0', 
    u'2016-05-09 23:43:00\t0', 
    u'2016-05-09 23:42:00\t0'], 
[u'::852-YF-008\t', 
    u'2016-05-10 00:00:00\t0', 
    u'2016-05-09 23:59:00\t0', 
    u'2016-05-09 23:58:00\t0', 
    u'2016-05-09 23:57:00\t0', 
    u'2016-05-09 23:56:00\t0', 
    u'2016-05-09 23:55:00\t0', 
    u'2016-05-09 23:54:00\t0', 
    u'2016-05-09 23:53:00\t0', 
    u'2016-05-09 23:52:00\t0', 
    u'2016-05-09 23:51:00\t0', 
    u'2016-05-09 23:50:00\t0', 
    u'2016-05-09 23:49:00\t0', 
    u'2016-05-09 23:48:00\t0', 
    u'2016-05-09 23:47:00\t0', 
    u'2016-05-09 23:46:00\t0', 
    u'2016-05-09 23:45:00\t0', 
    u'2016-05-09 23:44:00\t0', 
    u'2016-05-09 23:43:00\t0', 
    u'2016-05-09 23:42:00\t0']] 

私の質問で、さらにセンサー名を抽出するために、これを処理する方法となりますそのセンサーの値ライン。やや気に入った

852-YF-007 --> array of sensor_lines 
852-YF-008 --> array of sensor_lines 

次に、行自体がタイムスタンプと値に分割されます。しかし、センサーの名前を線から分けることにもっと興味があります。

答えて

1

個人的に私はだろう:空きエントリ

をフィルタリング

values = sheet.values() 
    • ::

      sheet = sc.newAPIHadoopFile(
          path, 
          'org.apache.hadoop.mapreduce.lib.input.TextInputFormat', 
          'org.apache.hadoop.io.LongWritable', 
          'org.apache.hadoop.io.Text', 
          conf={'textinputformat.record.delimiter': 'Time\tMHist::'} 
      ) 
      
    • ドロップキーとデリミタを拡張します

      non_empty = values.filter(lambda x: x) 
      
    • スプリット:

      grouped_lines = non_empty.map(str.splitlines) 
      
    • 別々のキーと値:

      from operator import itemgetter 
      
      pairs = grouped_lines.map(itemgetter(0, slice(1, None))) 
      
    • 、最終的には分割値:

      pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs]) 
      

    すべてのことをして行わことができますもちろん、単一の機能:

    import dateutil.parser 
    
    def process(pair): 
        _, content = pair 
        clean = [x.strip() for x in content.strip().splitlines()] 
        if not clean: 
         return [] 
        k, vs = clean[0], clean[1:] 
        for v in vs: 
         try: 
          ds, x = v.split("\t") 
          yield k, (dateutil.parser.parse(ds), float(x)) # or int(x) 
         except ValueError: 
          pass 
    
    sheet.flatMap(process) 
    
  • +0

    興味深いことに、より明示的にすることができます。 – zero323

    関連する問題