2016-06-16 6 views
14

テーブルの一部の列を行に転置しようとしています。 私はPythonとSpark 1.5.0を使用しています。ここに私の最初の表には、次のとおりです。Sparkで列を行に転記する

+-----+-----+-----+-------+ 
| A |col_1|col_2|col_...| 
+-----+-------------------+ 
| 1 | 0.0| 0.6| ... | 
| 2 | 0.6| 0.7| ... | 
| 3 | 0.5| 0.9| ... | 
| ...| ...| ...| ... | 

私はこのような気にいらがしたい:

+-----+--------+-----------+ 
| A | col_id | col_value | 
+-----+--------+-----------+ 
| 1 | col_1|  0.0| 
| 1 | col_2|  0.6| 
| ...|  ...|  ...|  
| 2 | col_1|  0.6| 
| 2 | col_2|  0.7| 
| ...|  ...|  ...| 
| 3 | col_1|  0.5| 
| 3 | col_2|  0.9| 
| ...|  ...|  ...| 

誰かが、私はそれを行うことができますサンザシを知っていますか?ご協力ありがとうございました。

答えて

21

基本的なSpark SQL関数では比較的簡単です。

Pythonの

from pyspark.sql.functions import array, col, explode, struct, lit 

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"]) 

def to_long(df, by): 

    # Filter dtypes and split into column names and type description 
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by)) 
    # Spark SQL supports only homogeneous columns 
    assert len(set(dtypes)) == 1, "All columns have to be of the same type" 

    # Create and explode an array of (column_name, column_value) structs 
    kvs = explode(array([ 
     struct(lit(c).alias("key"), col(c).alias("val")) for c in cols 
    ])).alias("kvs") 

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"]) 

to_long(df, ["A"]) 

スカラ

import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.functions.{array, col, explode, lit, struct} 

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2") 

def toLong(df: DataFrame, by: Seq[String]): DataFrame = { 
    val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip 
    require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")  

    val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _* 
)) 

    val byExprs = by.map(col(_)) 

    df 
    .select(byExprs :+ kvs.alias("_kvs"): _*) 
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*) 
} 

toLong(df, Seq("A")) 
+4

私はそれが "比較的"単純だとは思わない:) –

2

フラットマップを使用します。以下のようなものが

from pyspark.sql import Row 

def rowExpander(row): 
    rowDict = row.asDict() 
    valA = rowDict.pop('A') 
    for k in rowDict: 
     yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]}) 

newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)) 
+0

は、あなたの答えをいただき、ありがとうございます。しかし、それは動作しません。 ** TypeError:タプルのインデックスは、strではなく整数でなければなりません** – Raouf

3

スパークローカル線形代数ライブラリは現在、非常に弱い動作するはずです:彼らは、上記のように基本的な操作が含まれていません。

これをSpark 2.1用に修正するためのJIRAがありますが、今日はには役立ちません。

何かを考慮する:転置を実行すると、データを完全にシャッフルする必要があります。

今のところ、RDDコードを直接書く必要があります。私はtransposeをscalaで書いたが、pythonでは書いていない。

def transpose(mat: DMatrix) = { 
    val nCols = mat(0).length 
    val matT = mat 
     .flatten 
     .zipWithIndex 
     .groupBy { 
     _._2 % nCols 
    } 
     .toSeq.sortBy { 
     _._1 
    } 
     .map(_._2) 
     .map(_.map(_._1)) 
     .toArray 
    matT 
    } 

だから、あなたが使用するためのpythonにそれを変換することができます:ここではscalaバージョンです。私はこの特定の瞬間に書く/テストするための帯域幅を持っていません:あなたがその変換を行うことができなかったかどうか教えてください。

少なくとも、次は容易にpythonに変換されます。

  • zipWithIndex - >enumerate()(Pythonの同等 - @ zero323にクレジット)
  • map - >[someOperation(x) for x in ..]
  • groupBy - ここ>itertools.groupBy()

flattenための実装ですPythonに相当するものはありません。

解決策をまとめておく必要があります。

+0

ありがとうございます。私はスカラを知らないが、あなたのコードを理解しようとするだろう。私はあなたに知らせてくれるでしょう。 – Raouf

+0

@Raouf上記のコードはすべてPythonで同等の機能を持っています。あなたがよくpythonを知っているなら、問題はないはずです。私はPythonから唯一欠けている 'flatten'を示しました。私に教えてください;) – javadba

+1

'zipWithIndex' - >' enumerate() '(Pythonに相当)? – zero323

1

私はScalaはその@javadbaに答えるましたがDataFrame内のすべての列を転置のためのPythonのバージョンを書いて、作成しました。これは、OPが求めていたものとは少し違うかもしれません...例えば

from itertools import chain 
from pyspark.sql import DataFrame 


def _sort_transpose_tuple(tup): 
    x, y = tup 
    return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0] 


def transpose(X): 
    """Transpose a PySpark DataFrame. 

    Parameters 
    ---------- 
    X : PySpark ``DataFrame`` 
     The ``DataFrame`` that should be tranposed. 
    """ 
    # validate 
    if not isinstance(X, DataFrame): 
     raise TypeError('X should be a DataFrame, not a %s' 
         % type(X)) 

    cols = X.columns 
    n_features = len(cols) 

    # Sorry for this unreadability... 
    return X.rdd.flatMap(# make into an RDD 
     lambda xs: chain(xs)).zipWithIndex().groupBy(# zip index 
     lambda val_idx: val_idx[1] % n_features).sortBy(# group by index % n_features as key 
     lambda grp_res: grp_res[0]).map(# sort by index % n_features key 
     lambda grp_res: _sort_transpose_tuple(grp_res)).map(# maintain order 
     lambda key_col: key_col[1]).toDF() # return to DF 

:実装する

>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF() 
>>> X.show() 
+---+---+---+ 
| _1| _2| _3| 
+---+---+---+ 
| 1| 2| 3| 
| 4| 5| 6| 
| 7| 8| 9| 
+---+---+---+ 

>>> transpose(X).show() 
+---+---+---+ 
| _1| _2| _3| 
+---+---+---+ 
| 1| 4| 7| 
| 2| 5| 8| 
| 3| 6| 9| 
+---+---+---+ 
1

非常に便利な方法:

from pyspark.sql import Row 

def rowExpander(row): 
    rowDict = row.asDict() 
    valA = rowDict.pop('A') 
    for k in rowDict: 
     yield Row(**{'A': valA , 'colID' : k, 'colValue' : row[k]}) 

    newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)