apache spark - Pyspark Dataframe aggregation-count on different condition -


i aggregation (count) on 2 different condition , try assign 2 different columns. can suggest me easy method do?

in solution below have tried counts coming same though filter conditions different,

num_ins_rec_cnt = f.count(col("ins_upd_flag") == "i").alias("ins_rec_cnt") num_upd_rec_cnt = f.count(col("ins_upd_flag") == "u").alias("upd_rec_cnt") delta_process_max_ld_df = cdc_all_record_sk_ld_df.agg(f.max('delta_account_sk_id').alias("max_account_sk_id"),(num_ins_rec_cnt),(num_upd_rec_cnt)).withcolumn("lkp_process_name",lit(process_name)).withcolumn("history_tbl_cnt",lit(base_rec_count)).withcolumn("delta_tbl_cnt",lit(delta_rec_count)) 

the output is,

+-----------------+-----------+-----------+--------------------+---------------+-------------+ |max_account_sk_id|ins_rec_cnt|upd_rec_cnt|    lkp_process_name|history_tbl_cnt|delta_tbl_cnt| +-----------------+-----------+-----------+--------------------+---------------+-------------+ |            25099|       5100|       5100|amc_account_delta_ld|          19999|        20099| +-----------------+-----------+-----------+--------------------+---------------+-------------+ 

but should have been,

+-------+---------------+--+ |  _c0  | ins_upd_flag  | +-------+---------------+--+ | 5100  |             | | 5000  | u             |  sample data: +--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ |delta_acct_nbr|delta_account_sk_id|delta_zip_code|delta_primary_state|delta_eff_start_date|delta_eff_end_date|      delta_load_tm|      delta_hash_key|delta_eff_flag|ins_upd_flag| +--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ |   id330020000|              20000|         02345|                 ca|          2016-11-10|        3099-12-31|2016-11-10 14:53:52|19dedd4f9a55845e8...|             y|           i| |   id330020001|              20001|         02345|                 ca|          2016-11-10|        3099-12-31|2016-11-10 14:53:52|19dedd4f9a55845e8...|             y|           i| |   id330020002|              20002|         02345|                 ca|          2016-11-10|        3099-12-31|2016-11-10 14:53:52|19dedd4f9a55845e8...|             y|  

i have solved prob in below way.

delta_process_max_ld_df = cdc_all_record_sk_ld_df.withcolumn('ins_upd_flag_cnt', ****f.when(cdc_all_record_sk_ld_df.ins_upd_flag == 'i', 1).when(cdc_all_record_sk_ld_df.ins_upd_flag == 'u', 0).otherwise(0)).agg(f.max('delta_account_sk_id').alias("max_surrogate_id"),f.sum('ins_upd_flag_cnt').alias("insert_record_cnt"),f.count('*').alias("ins_upd_count")).withcolumn("process_name",lit(process_name)).withcolumn("process_run_date",lit(load_dt)).withcolumn("base_tbl_cnt",lit(base_rec_count)).withcolumn("delta_tbl_cnt",lit(delta_rec_count)).withcolumn("load_tm",lit(load_tm)).withcolumn("tbl_name",lit(tgt_tbl_nm)).withcolumn("load_date",lit(process_run_date))#.select(*status_tbl_columns) delta_process_max_ld_df1 = delta_process_max_ld_df.withcolumn("upd_record_cnt",lit(delta_process_max_ld_df.ins_upd_count - delta_process_max_ld_df.insert_record_cnt)).select(*status_tbl_columns)


Comments

Popular posts from this blog

sql server - Cannot query correctly (MSSQL - PHP - JSON) -

php - trouble displaying mysqli database results in correct order -

C++ Linked List -