2016-11-10 11 views
-2

2つの異なる条件で集計(カウント)し、2つの異なる列に割り当てようとしています。誰も私に簡単な方法を提案することはできますか?Pysparkデータフレームの集計が異なる条件で

私はフィルタ条件が異なっていてもカウントが同じ来て試してみましたが、それ以下ソリューション、

num_ins_rec_cnt = F.count(col("ins_upd_flag") == "I").alias("ins_rec_cnt") 
num_upd_rec_cnt = F.count(col("ins_upd_flag") == "U").alias("upd_rec_cnt") 
delta_process_max_ld_df = cdc_all_record_sk_ld_df.agg(F.max('delta_account_sk_id').alias("max_account_sk_id"),(num_ins_rec_cnt),(num_upd_rec_cnt)).withColumn("lkp_process_name",lit(process_name)).withColumn("history_tbl_cnt",lit(base_rec_count)).withColumn("delta_tbl_cnt",lit(delta_rec_count)) 

では、出力は、

+-----------------+-----------+-----------+--------------------+---------------+-------------+ 
|max_account_sk_id|ins_rec_cnt|upd_rec_cnt| lkp_process_name|history_tbl_cnt|delta_tbl_cnt| 
+-----------------+-----------+-----------+--------------------+---------------+-------------+ 
|   25099|  5100|  5100|amc_account_delta_ld|   19999|  20099| 
+-----------------+-----------+-----------+--------------------+---------------+-------------+ 

である。しかし、それは、

されている必要があります
+-------+---------------+--+ 
| _c0 | ins_upd_flag | 
+-------+---------------+--+ 
| 5100 | I    | 
| 5000 | U    | 

Sample Data: 
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ 
|delta_acct_nbr|delta_account_sk_id|delta_zip_code|delta_primary_state|delta_eff_start_date|delta_eff_end_date|  delta_load_tm|  delta_hash_key|delta_eff_flag|ins_upd_flag| 
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ 
| ID330020000|    20000|   02345|     CA|   2016-11-10|  3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|    Y|   I| 
| ID330020001|    20001|   02345|     CA|   2016-11-10|  3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|    Y|   I| 
| ID330020002|    20002|   02345|     CA|   2016-11-10|  3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|    Y| 
+0

はい..正しい..私は別の列としてテーブルに記述する必要があります。私はフィルター出力を2つの別々の列が必要です。 – user3858193

答えて

-1

私は以下の方法でprobを解決しました。

delta_process_max_ld_df = cdc_all_record_sk_ld_df.withColumn( 'ins_upd_flag_cnt'、**** F.when(cdc_all_record_sk_ld_df.ins_upd_flag == 'I' は、1).when(cdc_all_record_sk_ld_df.ins_upd_flag == 'U'、0).otherwise( 0)).agg( 'del_account_sk_id')エイリアス( "max_surrogate_id")、F.sum( 'ins_upd_flag_cnt')エイリアス( "insert_record_cnt")、F.count( '*')。エイリアス( "process_run_date"、lit(load_dt))).Column( "base_tbl_cnt"、lit(base_rec_count))。withColumn( "delta_tbl_cnt"、lit())) .withColumn( "process_name"、lit(process_name) (* status_tbl_columns) delta_process_max_ld_df1 =(delta_rec_count)).Column( "load_date"、lit(process_run_date))).Column( "load_tm"、lit(load_tm) delta_process_max_ld_df.withColu MN( "upd_record_cnt"、点灯(delta_process_max_ld_df.ins_upd_count - delta_process_max_ld_df.insert_record_cnt)) .select(* status_tbl_columns)

関連する問題