1
以下は多かれ少なかれストレートなPythonコードです。私がデータフレーム内でフィルタリングしている列のデータスキーマは、基本的にjson文字列です。Pyspark:データフレーム列のjson文字列を変換する方法
しかし、私はこのために必要なメモリを大幅に増やさなければならず、1つのノードでしか動作しません。収集を使用することはおそらく悪く、単一のノードでこれをすべて作成することは実際にはSparkの分散された性質を利用していません。
私は、より多くのスパーク中心のソリューションをお勧めしたいと思います。スパークをよりうまく利用するために、誰かが私の下の論理をマッサージするのを助けることができますかまた、学習のポイントとして、更新がなぜそれを改善するのか理由を説明してください。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from pyspark.sql.types import SchemaStruct, SchemaField, StringType
input_schema = SchemaStruct([
SchemaField('scrubbed_col_name', StringType(), nullable=True)
])
output_schema = SchemaStruct([
SchemaField('val01_field_name', StringType(), nullable=True),
SchemaField('val02_field_name', StringType(), nullable=True)
])
example_input = [
'''[{"val01_field_name": "val01_a", "val02_field_name": "val02_a"},
{"val01_field_name": "val01_a", "val02_field_name": "val02_b"},
{"val01_field_name": "val01_b", "val02_field_name": "val02_c"}]''',
'''[{"val01_field_name": "val01_c", "val02_field_name": "val02_a"}]''',
'''[{"val01_field_name": "val01_a", "val02_field_name": "val02_d"}]''',
]
desired_output = {
'val01_a': ['val_02_a', 'val_02_b', 'val_02_d'],
'val01_b': ['val_02_c'],
'val01_c': ['val_02_a'],
}
def capture(dataframe):
# Capture column from data frame if it's not empty
data = dataframe.filter('scrubbed_col_name != null')\
.select('scrubbed_col_name')\
.rdd\
.collect()
# Create a mapping of val1: list(val2)
mapping = {}
# For every row in the rdd
for row in data:
# For each json_string within the row
for json_string in row:
# For each item within the json string
for val in json.loads(json_string):
# Extract the data properly
val01 = val.get('val01_field_name')
val02 = val.get('val02_field_name')
if val02 not in mapping.get(val01, []):
mapping.setdefault(val01, []).append(val02)
return mapping