2016-09-07 41 views
1

以下は多かれ少なかれストレートなPythonコードです。私がデータフレーム内でフィルタリングしている列のデータスキーマは、基本的にjson文字列です。Pyspark:データフレーム列のjson文字列を変換する方法

しかし、私はこのために必要なメモリを大幅に増やさなければならず、1つのノードでしか動作しません。収集を使用することはおそらく悪く、単一のノードでこれをすべて作成することは実際にはSparkの分散された性質を利用していません。

私は、より多くのスパーク中心のソリューションをお勧めしたいと思います。スパークをよりうまく利用するために、誰かが私の下の論理をマッサージするのを助けることができますかまた、学習のポイントとして、更新がなぜそれを改善するのか理由を説明してください。

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 
import json 

from pyspark.sql.types import SchemaStruct, SchemaField, StringType 


input_schema = SchemaStruct([ 
    SchemaField('scrubbed_col_name', StringType(), nullable=True) 
]) 


output_schema = SchemaStruct([ 
    SchemaField('val01_field_name', StringType(), nullable=True), 
    SchemaField('val02_field_name', StringType(), nullable=True) 
]) 


example_input = [ 
    '''[{"val01_field_name": "val01_a", "val02_field_name": "val02_a"}, 
     {"val01_field_name": "val01_a", "val02_field_name": "val02_b"}, 
     {"val01_field_name": "val01_b", "val02_field_name": "val02_c"}]''', 
    '''[{"val01_field_name": "val01_c", "val02_field_name": "val02_a"}]''', 
    '''[{"val01_field_name": "val01_a", "val02_field_name": "val02_d"}]''', 
] 

desired_output = { 
    'val01_a': ['val_02_a', 'val_02_b', 'val_02_d'], 
    'val01_b': ['val_02_c'], 
    'val01_c': ['val_02_a'], 
} 


def capture(dataframe): 
    # Capture column from data frame if it's not empty 
    data = dataframe.filter('scrubbed_col_name != null')\ 
        .select('scrubbed_col_name')\ 
        .rdd\ 
        .collect() 

    # Create a mapping of val1: list(val2) 
    mapping = {} 
    # For every row in the rdd 
    for row in data: 
     # For each json_string within the row 
     for json_string in row: 
      # For each item within the json string 
      for val in json.loads(json_string): 
       # Extract the data properly 
       val01 = val.get('val01_field_name') 
       val02 = val.get('val02_field_name') 
       if val02 not in mapping.get(val01, []): 
        mapping.setdefault(val01, []).append(val02) 
    return mapping 

答えて

2

一つの可能​​な解決策:(?あなたは本当にグループ化された値を必要としない)、この操作が正確に効率的ではありません

(df 
    .rdd # Convert to rdd 
    .flatMap(lambda x: x) # Flatten rows 
    # Parse JSON. In practice you should add proper exception handling 
    .flatMap(lambda x: json.loads(x)) 
    # Get values 
    .map(lambda x: (x.get('val01_field_name'), x.get('val02_field_name'))) 
    # Convert to final shape 
    .groupByKey()) 

考えると、出力仕様が、それでもcollectよりもはるかに良いです。

関連する問題