Below is the sample one
 
def batch_operation(df, batch_id😞
    logger.info(f"batch_id: {batch_id}")
   
    src_cnt = 0
    filtr_cnt = 0
    df = src_table_df \
        .withColumn('load_ctl_key', lit(aws_lck)

    src_columns = df.columns

    # checksum text column is generated and added to the DataFrame.
   
    if condition
        df2 = create_chk_sum_txt(df=df, cols=col_info, exc_cols=exclude_columns)
    else:
        df2 = df

    df3 = df2.filter(
        col("id").isNull() | (length(col("id")) > 50) | (length(col("cd")) > 10) | \
        (col("5yr").isNotNull() & col("5yr").cast("int").isNull())
    )

    #    columns=df3.columns
    logger.info("df3 completed")

    df4 = df3.withColumn("error_reason", when(col("id").isNull(), "id is Null") \
                         .when(length(col("id")) > 50, "id exceeds column size in base table") \
                         .otherwise("Miscellaneous Error"))

    #   Filter valid records to be loaded to Base Table
    df5 = df2.exceptAll(df3).withColumn("error_reason", lit(""))

    ins_cnt = df5.count()
    filtr_cnt = src_cnt - ins_cnt

    logger.info('src_cnt: {}'.format(src_cnt))
    logger.info('ins_cnt: {}'.format(ins_cnt))
    logger.info('filtr_cnt: {}'.format(filtr_cnt))

    select_df = df5.select(*col_info)
    logger.info("select_df completed")

    err_df = df4.select(*col_info)
    logger.info("err_df completed")

    logger.info("Final Schema before writing to UC:")
    select_df.printSchema()
    err_df.printSchema()

 

 

    (
        transformed_df.write
        .mode("append")
        .saveAsTable(f"catalog.schema.{trgt_tbl}")
    )

 

    (
        err_df.write
        .mode("append")
        .saveAsTable(f"catalog.schema.{err_tbl}")
    )

 

    if int(load_ctl_key) != -1:
        print("Pushing metrics to UC")

 

        payload = {
            "load_ctl_key": load_ctl_key,
            "job_id": job_id,
            "batch_id": batch_id,
            "table": trgt_tbl,
            "src_cnt": src_cnt,
            "filter_cnt": filtr_cnt,
            "ins_cnt": ins_cnt
        }

 

        print("Payload: {}".format(payload))

 

        payload_df = spark.read.json(sc.parallelize([payload]))
        display(payload_df)

 

        payload_df.write.mode("append").saveAsTable(f"catalog.schema.audit")