Hi @seefoods ,
You can write decorator function and use it to perform custom logging.
from functools import wraps
from pyspark.sql import SparkSession
from datetime import datetime
import traceback
spark = SparkSession.builder.getOrCreate()
def logger(success_table: str, error_table: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
# Success log
log_df = spark.createDataFrame([{
"function_name": func.__name__,
"status": "SUCCESS",
"timestamp": datetime.now()
}])
log_df.write.format("delta").mode("append").saveAsTable(success_table)
return result
except Exception as e:
# Error log
error_df = spark.createDataFrame([{
"function_name": func.__name__,
"status": "ERROR",
"timestamp": datetime.now(),
"error_message": str(e),
"stack_trace": traceback.format_exc()
}])
error_df.write.format("delta").mode("append").saveAsTable(error_table)
raise # Optional: re-raise exception
return wrapper
return decorator
#function definition
@logger(success_table="LOG_SUCCESS", error_table="LOG_ERROR")
def load_data(a, b):
print(a/b)
return "Done"
#calling the function
load_data(20, 10) #this will log the success message to LOG_SUCCESS
load_data(10, 0) #this will log the exception message to LOG_ERROR
Check below article for more details:
Custom Logging to Delta Tables in Databricks: Reduce Code Redundancy | by Krishnanand A | Medium