apache spark - Pyspark Dataframe aggregation-count on different condition -
i aggregation (count) on 2 different condition , try assign 2 different columns. can suggest me easy method do?
in solution below have tried counts coming same though filter conditions different,
num_ins_rec_cnt = f.count(col("ins_upd_flag") == "i").alias("ins_rec_cnt") num_upd_rec_cnt = f.count(col("ins_upd_flag") == "u").alias("upd_rec_cnt") delta_process_max_ld_df = cdc_all_record_sk_ld_df.agg(f.max('delta_account_sk_id').alias("max_account_sk_id"),(num_ins_rec_cnt),(num_upd_rec_cnt)).withcolumn("lkp_process_name",lit(process_name)).withcolumn("history_tbl_cnt",lit(base_rec_count)).withcolumn("delta_tbl_cnt",lit(delta_rec_count)) the output is,
+-----------------+-----------+-----------+--------------------+---------------+-------------+ |max_account_sk_id|ins_rec_cnt|upd_rec_cnt| lkp_process_name|history_tbl_cnt|delta_tbl_cnt| +-----------------+-----------+-----------+--------------------+---------------+-------------+ | 25099| 5100| 5100|amc_account_delta_ld| 19999| 20099| +-----------------+-----------+-----------+--------------------+---------------+-------------+ but should have been,
+-------+---------------+--+ | _c0 | ins_upd_flag | +-------+---------------+--+ | 5100 | | | 5000 | u | sample data: +--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ |delta_acct_nbr|delta_account_sk_id|delta_zip_code|delta_primary_state|delta_eff_start_date|delta_eff_end_date| delta_load_tm| delta_hash_key|delta_eff_flag|ins_upd_flag| +--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ | id330020000| 20000| 02345| ca| 2016-11-10| 3099-12-31|2016-11-10 14:53:52|19dedd4f9a55845e8...| y| i| | id330020001| 20001| 02345| ca| 2016-11-10| 3099-12-31|2016-11-10 14:53:52|19dedd4f9a55845e8...| y| i| | id330020002| 20002| 02345| ca| 2016-11-10| 3099-12-31|2016-11-10 14:53:52|19dedd4f9a55845e8...| y|
i have solved prob in below way.
delta_process_max_ld_df = cdc_all_record_sk_ld_df.withcolumn('ins_upd_flag_cnt', ****f.when(cdc_all_record_sk_ld_df.ins_upd_flag == 'i', 1).when(cdc_all_record_sk_ld_df.ins_upd_flag == 'u', 0).otherwise(0)).agg(f.max('delta_account_sk_id').alias("max_surrogate_id"),f.sum('ins_upd_flag_cnt').alias("insert_record_cnt"),f.count('*').alias("ins_upd_count")).withcolumn("process_name",lit(process_name)).withcolumn("process_run_date",lit(load_dt)).withcolumn("base_tbl_cnt",lit(base_rec_count)).withcolumn("delta_tbl_cnt",lit(delta_rec_count)).withcolumn("load_tm",lit(load_tm)).withcolumn("tbl_name",lit(tgt_tbl_nm)).withcolumn("load_date",lit(process_run_date))#.select(*status_tbl_columns) delta_process_max_ld_df1 = delta_process_max_ld_df.withcolumn("upd_record_cnt",lit(delta_process_max_ld_df.ins_upd_count - delta_process_max_ld_df.insert_record_cnt)).select(*status_tbl_columns)
Comments
Post a Comment