2016-11-16 20 views
2

私は列をKとKのみマージの値は、以下の表にpysparkの条件と2つの列をマージする方法は?

df = sqlContext.createDataFrame([("foo", "bar","too","aaa"), ("bar", "bar","aaa","foo")], ("k", "K" ,"v" ,"V")) 
columns = df.columns 

k = 0 
for i in range(len(columns)): 
    for j in range(i + 1, len(columns)): 
     if columns[i].lower() == columns[j].lower(): 
     k = k+1 
     df = (df.withColumn(columns[i]+str(k),concat(col(columns[i]),lit(","), col(columns[j])))) 
     newdf = df.select(col("k"),split(col("c1"), ",\s*").alias("c1")) 
     sortDf = newdf.select(newdf.k,sort_array(newdf.c1).alias('sorted_c1')) 

等しい場合、マージしない状態を把握するためにマージし、値をソートすることができるが、できなかった[FOO、バー]しかしない[バー、バー]

入力:

+---+---+---+---+ 
| k| K| v| V| 
+---+---+---+---+ 
|foo|bar|too|aaa| 
|bar|bar|aaa|foo| 
+---+---+---+---+ 

出力:

+---+---+---+---+-----------+ 
| k| K|Merged K |Merged V | 
+---+---+-------------------+ 
|foo|bar|[foo,bar] |[too,aaa] 
|bar|bar|bar  |[aaa,foo] 
+---+---+---+------+--------+ 

答えて

1

試行:

from pyspark.sql.functions import udf 

def merge(*c): 
    merged = sorted(set(c)) 
    if len(merged) == 1: 
     return merged[0] 
    else: 
     return "[{0}]".format(",".join(merged)) 

merge_udf = udf(merge) 

df = sqlContext.createDataFrame([("foo", "bar","too","aaa"), ("bar", "bar","aaa","foo")], ("k1", "k2" ,"v1" ,"v2")) 

df.select(merge_udf("k1", "k2"), merge_udf("v1", "v2")) 
関連する問題