2016-12-22 2 views
5

既存の列に基づいて新しいSpark DF MapType列を作成する必要があります。列名がキーで、値は値です。例としてpyspark:既存の列からMapType列を作成

- 私はこのDFをしました:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6), 
         ('d23d', 1.5, 2.0, 2.2), 
         ('as3d', 2.2, 4.3, 9.0) 
          ]) 
schema = StructType([StructField('key', StringType(), True), 
        StructField('metric1', FloatType(), True), 
        StructField('metric2', FloatType(), True), 
        StructField('metric3', FloatType(), True)]) 
df = sqlContext.createDataFrame(rdd, schema) 

+----+-------+-------+-------+ 
| key|metric1|metric2|metric3| 
+----+-------+-------+-------+ 
|123k| 1.3| 6.3| 7.6| 
|d23d| 1.5| 2.0| 2.2| 
|as3d| 2.2| 4.3| 9.0| 
+----+-------+-------+-------+ 

私はこれまでのところ、私はこのことからstructTypeを作成することができますすでにだ:

nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric") 
df2 = df.select("key", nameCol) 

+----+-------------+ 
| key|  metric| 
+----+-------------+ 
|123k|[1.3,6.3,7.6]| 
|d23d|[1.5,2.0,2.2]| 
|as3d|[2.2,4.3,9.0]| 
+----+-------------+ 

しかし、私は必要なのメトリック列でありますキーが列名の場合はMapTypeです。

+----+-------------------------+ 
| key|     metric| 
+----+-------------------------+ 
|123k|Map(metric1 -> 1.3, me...| 
|d23d|Map(metric1 -> 1.5, me...| 
|as3d|Map(metric1 -> 2.2, me...| 
+----+-------------------------+ 

どのようにデータを変換できますか?

ありがとうございます!

答えて

8

Spark 2.0以降では、create_mapを使用できます。まず、いくつかの輸入:

metric = create_map(list(chain(*(
    (lit(name), col(name)) for name in df.columns if "metric" in name 
)))).alias("metric") 

selectで使用:例では

df.select("key", metric) 

from pyspark.sql.functions import lit, col, create_map 
from itertools import chain 

create_mapは、次のように例のために作成することができkeysvaluesのインターリーブされたシーケンスを期待結果は次のようになります。

+----+---------------------------------------------------------+ 
|key |metric             | 
+----+---------------------------------------------------------+ 
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)  | 
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)  | 
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)  | 
+----+---------------------------------------------------------+ 

あなたはスパークの以前のバージョンを使用している場合は、UDFを使用する必要があります:

次のように使用することができ
from pyspark.sql import Column 
from pyspark.sql.functions import struct 
from pyspark.sql.types import DataType 

def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column: 
    args = [struct(lit(name), col(name)) for name in cols] 
    as_map_ = udf(
     lambda *args: dict(args), 
     MapType(StringType(), key_type) 
    ) 
    return as_map_(*args) 

df.select("key", 
    as_map(*[name for name in df.columns if "metric" in name]).alias("metric")) 
+0

あなたのソリューションが良さそうだ、それを使用することができます答えてください:https://stackoverflow.com/questions/45445077/pyspark-spark-dataframe-aggregate-columns-in-map-type? –

関連する問題