2016-11-14 11 views
0

異なる方法で集約各フィールド:Pyspark集約 - Iは、4つの基本的なフィールド有するいくつかのデータ持っ

  1. フィールド1 2は、すべてのユニークな値の集合であるべきである
  2. フィールドデータのキーでありますこのフィールド
  3. フィールド3の最小値は

オリジナルコードはこのようになります(タイムスタンプ)

  • フィールド4が最大値である(タイムスタンプ)です。

    data = (
        dataframe 
        .rdd 
        # flatten rows 
        .flatMap(lambda x: x) 
        # Parse JSON 
        .flatMap(lambda x: encode_json(x)) 
        # Capture values 
        .map(lambda x: [ 
         # Merge 'field1', 'field2' --> 'field1, field2' 
         ','.join(_ for _ in [x.get('metadata_value'), x.get('field2')]), 
         # Create pairing of low and high timestamps 
         [x.get('low'), x.get('high')] 
        ]) 
        # Aggregate into a list of low/high timestamps per 'field1, field2' 
        .aggregateByKey(list(), lambda u, v: u + [v], lambda u1, u2: u1 + u2) 
        # Flatten keys 'ip,guid' --> 'ip', 'guid' 
        .map(lambda x: (x[0].split(',')[0], x[0].split(',')[1], x[1], sum(1 for _ in x[1]))) 
        # Reduce timestamps to single values: [s1, e1], [s2, e2], ... --> s_min, e_max 
        .map(lambda x: (x[0], x[1], min(_[0] for _ in x[2]), max(_[1] for _ in x[2]), x[3])) 
    ) 
    

    オリジナル出力は次のようになります。

    a | x| 20160103 | 20160107 
    a | x013579 | 20160101 | 20160106 
    

    新しい出力は次のようになります。

    a | {x,x013579} | 20160101 | 20160107 
    
  • 答えて

    1

    はペアRDDにマップするために、あなたの現在の出力には、この2つの変換を追加し、それに対応する操作(辞書、分、最大)によって各フィールドを縮小します。

    data.map(lambda reg: (reg[0],[reg[1],reg[2],reg[3]])) .reduceByKey(lambda v1,v2: ({v1[0],v2[0]},min(v1[1],v2[1]), max(v1[2],v2[2])))