2017-10-03 6 views
0

私はApache Sparkのデータフレームに計算カラムのセットを持っています。データフレーム内の各行(約2000)に対して、10列の行の値を取得し、11列目の値に最も近い値を探します。Apache Spark:データフレーム内の行の値のリストのパーセンテイル

これらの行の値をとると思います。それをリストに変換し、abs値の計算を使用して最も近いものを決定します。

しかし、私は行の値をリストにする方法に固執しています。私は列を取って、collect_listを使用してこれらの値をリストに変換しましたが、リストが単一の行と複数の列から来た場合の処理​​方法は不明です。

答えて

1

あなたの計算を線形化できるように、explodeの列が必要です。

のは、サンプルのデータフレームを作成してみましょう:

import numpy as np 
np.random.seed(0) 
df = sc.parallelize([np.random.randint(0, 10, 11).tolist() for _ in range(20)])\ 
    .toDF(["col" + str(i) for i in range(1, 12)]) 
df.show() 
    +----+----+----+----+----+----+----+----+----+-----+-----+ 
    |col1|col2|col3|col4|col5|col6|col7|col8|col9|col10|col11| 
    +----+----+----+----+----+----+----+----+----+-----+-----+ 
    | 5| 0| 3| 3| 7| 9| 3| 5| 2| 4| 7| 
    | 6| 8| 8| 1| 6| 7| 7| 8| 1| 5| 9| 
    | 8| 9| 4| 3| 0| 3| 5| 0| 2| 3| 8| 
    | 1| 3| 3| 3| 7| 0| 1| 9| 9| 0| 4| 
    | 7| 3| 2| 7| 2| 0| 0| 4| 5| 5| 6| 
    | 8| 4| 1| 4| 9| 8| 1| 1| 7| 9| 9| 
    | 3| 6| 7| 2| 0| 3| 5| 9| 4| 4| 6| 
    | 4| 4| 3| 4| 4| 8| 4| 3| 7| 5| 5| 
    | 0| 1| 5| 9| 3| 0| 5| 0| 1| 2| 4| 
    | 2| 0| 3| 2| 0| 7| 5| 9| 0| 2| 7| 
    | 2| 9| 2| 3| 3| 2| 3| 4| 1| 2| 9| 
    | 1| 4| 6| 8| 2| 3| 0| 0| 6| 0| 6| 
    | 3| 3| 8| 8| 8| 2| 3| 2| 0| 8| 8| 
    | 3| 8| 2| 8| 4| 3| 0| 4| 3| 6| 9| 
    | 8| 0| 8| 5| 9| 0| 9| 6| 5| 3| 1| 
    | 8| 0| 4| 9| 6| 5| 7| 8| 8| 9| 2| 
    | 8| 6| 6| 9| 1| 6| 8| 8| 3| 2| 3| 
    | 6| 3| 6| 5| 7| 0| 8| 4| 6| 5| 8| 
    | 2| 3| 9| 7| 5| 3| 4| 5| 3| 3| 7| 
    | 9| 9| 9| 7| 3| 2| 3| 9| 7| 7| 5| 
    +----+----+----+----+----+----+----+----+----+-----+-----+ 

リストに行の値を有効にするいくつかの方法があります。

  • カラム名に等しい値に等しいキーを持つmapを作成するには対応する行の値あなたが爆発し、リストを持っていたら、あなたはの最小値を探すことができますArrayType ...

を使用してArrayType

df = df\ 
    .withColumn("id", psf.monotonically_increasing_id())\ 
    .select(
     "id", 
     psf.explode(
      psf.array([psf.struct(psf.lit(c).alias("col_name"), psf.col(c).alias("value")) 
         for c in df.columns if c != "col11"])).alias("cols"), 
     "col11").select("cols.*", "col11", "id") 
df.show() 
    +--------+-----+-----+---+ 
    |col_name|value|col11| id| 
    +--------+-----+-----+---+ 
    | col1| 5| 7| 0| 
    | col2| 0| 7| 0| 
    | col3| 3| 7| 0| 
    | col4| 3| 7| 0| 
    | col5| 7| 7| 0| 
    | col6| 9| 7| 0| 
    | col7| 3| 7| 0| 
    | col8| 5| 7| 0| 
    | col9| 2| 7| 0| 
    | col10| 4| 7| 0| 
    | col1| 6| 9| 1| 
    | col2| 8| 9| 1| 
    | col3| 8| 9| 1| 
    | col4| 1| 9| 1| 
    | col5| 6| 9| 1| 
    | col6| 7| 9| 1| 
    | col7| 7| 9| 1| 
    | col8| 8| 9| 1| 
    | col9| 1| 9| 1| 
    | col10| 5| 9| 1| 
    +--------+-----+-----+---+ 
  • 内部StructTypeを使用して

    import pyspark.sql.functions as psf 
    from itertools import chain 
    df = df\ 
        .withColumn("id", psf.monotonically_increasing_id())\ 
        .select(
         "id", 
         psf.posexplode(
          psf.create_map(list(chain(*[(psf.lit(c), psf.col(c)) for c in df.columns if c != "col11"]))) 
         ).alias("pos", "col_name", "value"), "col11") 
    df.show() 
        +---+---+--------+-----+-----+ 
        | id|pos|col_name|value|col11| 
        +---+---+--------+-----+-----+ 
        | 0| 0| col1| 5| 7| 
        | 0| 1| col2| 0| 7| 
        | 0| 2| col3| 3| 7| 
        | 0| 3| col4| 3| 7| 
        | 0| 4| col5| 7| 7| 
        | 0| 5| col6| 9| 7| 
        | 0| 6| col7| 3| 7| 
        | 0| 7| col8| 5| 7| 
        | 0| 8| col9| 2| 7| 
        | 0| 9| col10| 4| 7| 
        | 1| 0| col1| 6| 9| 
        | 1| 1| col2| 8| 9| 
        | 1| 2| col3| 8| 9| 
        | 1| 3| col4| 1| 9| 
        | 1| 4| col5| 6| 9| 
        | 1| 5| col6| 7| 9| 
        | 1| 6| col7| 7| 9| 
        | 1| 7| col8| 8| 9| 
        | 1| 8| col9| 1| 9| 
        | 1| 9| col10| 5| 9| 
        +---+---+--------+-----+-----+ 
    
  • |col11 - value|

    from pyspark.sql import Window 
    w = Window.partitionBy("id").orderBy(psf.abs(psf.col("col11") - psf.col("value"))) 
    res = df.withColumn("rn", psf.row_number().over(w)).filter("rn = 1") 
    res.sort("id").show() 
        +--------+-----+-----+----------+---+ 
        |col_name|value|col11|  id| rn| 
        +--------+-----+-----+----------+---+ 
        | col5| 7| 7|   0| 1| 
        | col2| 8| 9|   1| 1| 
        | col1| 8| 8|   2| 1| 
        | col2| 3| 4|   3| 1| 
        | col1| 7| 6|   4| 1| 
        | col5| 9| 9|   5| 1| 
        | col2| 6| 6|   6| 1| 
        | col10| 5| 5|   7| 1| 
        | col3| 5| 4|   8| 1| 
        | col6| 7| 7|   9| 1| 
        | col2| 9| 9|8589934592| 1| 
        | col3| 6| 6|8589934593| 1| 
        | col3| 8| 8|8589934594| 1| 
        | col2| 8| 9|8589934595| 1| 
        | col2| 0| 1|8589934596| 1| 
        | col2| 0| 2|8589934597| 1| 
        | col9| 3| 3|8589934598| 1| 
        | col7| 8| 8|8589934599| 1| 
        | col4| 7| 7|8589934600| 1| 
        | col4| 7| 5|8589934601| 1| 
        +--------+-----+-----+----------+---+ 
    
  • +0

    ありがとうございました。だから私はこれらの値のリストを持つ列を持って、このリストのパーセンタイルを計算することは可能ですか? – Dan

    +0

    ウィンドウ関数を使用して、差の絶対値の最小値を直接探すことができます。 – MaFF

    関連する問題