Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-02-2024 09:17 AM
Below is the sample one
def batch_operation(df, batch_id😞
logger.info(f"batch_id: {batch_id}")
src_cnt = 0
filtr_cnt = 0
df = src_table_df \
.withColumn('load_ctl_key', lit(aws_lck)
src_columns = df.columns
# checksum text column is generated and added to the DataFrame.
if condition
df2 = create_chk_sum_txt(df=df, cols=col_info, exc_cols=exclude_columns)
else:
df2 = df
df3 = df2.filter(
col("id").isNull() | (length(col("id")) > 50) | (length(col("cd")) > 10) | \
(col("5yr").isNotNull() & col("5yr").cast("int").isNull())
)
# columns=df3.columns
logger.info("df3 completed")
df4 = df3.withColumn("error_reason", when(col("id").isNull(), "id is Null") \
.when(length(col("id")) > 50, "id exceeds column size in base table") \
.otherwise("Miscellaneous Error"))
# Filter valid records to be loaded to Base Table
df5 = df2.exceptAll(df3).withColumn("error_reason", lit(""))
ins_cnt = df5.count()
filtr_cnt = src_cnt - ins_cnt
logger.info('src_cnt: {}'.format(src_cnt))
logger.info('ins_cnt: {}'.format(ins_cnt))
logger.info('filtr_cnt: {}'.format(filtr_cnt))
select_df = df5.select(*col_info)
logger.info("select_df completed")
err_df = df4.select(*col_info)
logger.info("err_df completed")
logger.info("Final Schema before writing to UC:")
select_df.printSchema()
err_df.printSchema()
(
transformed_df.write
.mode("append")
.saveAsTable(f"catalog.schema.{trgt_tbl}")
)
(
err_df.write
.mode("append")
.saveAsTable(f"catalog.schema.{err_tbl}")
)
if int(load_ctl_key) != -1:
print("Pushing metrics to UC")
payload = {
"load_ctl_key": load_ctl_key,
"job_id": job_id,
"batch_id": batch_id,
"table": trgt_tbl,
"src_cnt": src_cnt,
"filter_cnt": filtr_cnt,
"ins_cnt": ins_cnt
}
print("Payload: {}".format(payload))
payload_df = spark.read.json(sc.parallelize([payload]))
display(payload_df)
payload_df.write.mode("append").saveAsTable(f"catalog.schema.audit")