0

PySpark 2.0で実行したい操作はdf.rdd.mapとして実行しやすいが、Dataframe実行エンジンのパフォーマンス上の理由から、Dataframe操作のみを使用してこれを行う方法を見つけたいと思います。Spark Dataframeカラムのデータを条件として使用するか、別のカラム式に入力する

操作、RDD-スタイルで、このようなものされています。基本的に、私は私の文字列の書式設定操作の精度がどうあるべきか、行ごとに、私に語ったコラムを

def precision_formatter(row): 
    formatter = "%.{}f".format(row.precision) 
    return row + [formatter % row.amount_raw/10 ** row.precision] 
df = df.rdd.map(precision_formatter) 

を持っており、私は、その精度に応じて、 'amount_raw'列を選択的に文字列としてフォーマットしたいと思います。

答えて

0

1つ以上の列の内容を別の列操作の入力として使用する方法がわかりません。最も近いのは、Column.whenの使用を、列内の可能なブール条件/ケースのセットに対応する、ブール演算の外部定義セットで示唆することです。

たとえば、可能なすべての値がrow.precisionである場合は、そのセットを反復処理して、セット内の各値に対してColumn.when操作を適用できます。私はこのセットがdf.select('precision').distinct().collect()で得られると信じています。 pyspark.sql.functions.whenColumn.when操作自体はColumnオブジェクトを返す

ので、あなたがセットを使い果たしてしまうまで、あなたはプログラムでお互いにwhen事業「を付加」(ただし、それが得られた)セット内の項目を反復処理しておくことができます。

import pyspark.sql.functions as PSF 

def format_amounts_with_precision(df, all_precisions_set): 
    amt_col = PSF.when(df['precision'] == 0, df['amount_raw'].cast(StringType())) 
    for precision in all_precisions_set: 
     if precision != 0: # this is a messy way of having a base case above 
      fmt_str = '%.{}f'.format(precision) 
      amt_col = amt_col.when(df['precision'] == precision, 
          PSF.format_string(fmt_str, df['amount_raw']/10 ** precision) 

    return df.withColumn('amount', amt_col) 
0

これはPython UDFで行うことができます。彼らは多くの入力値(行の列からの値)をとり、単一の出力値を吐き出すことができます。代わりに、列の精度値を使用すると、グローバルなものを使用したい場合は、このようにそれを呼び出すときに点灯(..)関数を使用することができ、

from pyspark.sql import types as T, functions as F 
from pyspark.sql.function import udf, col 

# Create example data frame 
schema = T.StructType([ 
    T.StructField('precision', T.IntegerType(), False), 
    T.StructField('value', T.FloatType(), False) 
]) 

data = [ 
    (1, 0.123456), 
    (2, 0.123456), 
    (3, 0.123456) 
] 

rdd = sc.parallelize(data) 
df = sqlContext.createDataFrame(rdd, schema) 

# Define UDF and apply it 
def format_func(precision, value): 
    format_str = "{:." + str(precision) + "f}" 
    return format_str.format(value) 

format_udf = F.udf(format_func, T.StringType()) 

new_df = df.withColumn('formatted', format_udf('precision', 'value')) 
new_df.show() 

も:それは次のようになり

new_df = df.withColumn('formatted', format_udf(F.lit(2), 'value')) 
関連する問題