cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

write both logging error Pyspark and Python exceptions

seefoods
Valued Contributor

Hello guyz, 

Happy new year and best wishes for all of us. I am catching both Pyspark and Python exceptions but i want to write this logging error inside a delta table when i logging. Someone knows the best practise for this ? 

Thanks 

Cordially, 

1 ACCEPTED SOLUTION

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

Hi @seefoods ,

You can write decorator function and use it to perform custom logging.

from functools import wraps
from pyspark.sql import SparkSession
from datetime import datetime
import traceback

spark = SparkSession.builder.getOrCreate()

def logger(success_table: str, error_table: str):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                result = func(*args, **kwargs)

                # Success log
                log_df = spark.createDataFrame([{
                    "function_name": func.__name__,
                    "status": "SUCCESS",
                    "timestamp": datetime.now()
                }])
                log_df.write.format("delta").mode("append").saveAsTable(success_table)

                return result
            except Exception as e:
                # Error log
                error_df = spark.createDataFrame([{
                    "function_name": func.__name__,
                    "status": "ERROR",
                    "timestamp": datetime.now(),
                    "error_message": str(e),
                    "stack_trace": traceback.format_exc()
                }])
                error_df.write.format("delta").mode("append").saveAsTable(error_table)

                raise  # Optional: re-raise exception
        return wrapper
    return decorator

#function definition
@logger(success_table="LOG_SUCCESS", error_table="LOG_ERROR")
def load_data(a, b):
    print(a/b)
    return "Done"

#calling the function
load_data(20, 10) #this will log the success message to LOG_SUCCESS
load_data(10, 0) #this will log the exception message to LOG_ERROR

 Check below article for more details:

Custom Logging to Delta Tables in Databricks: Reduce Code Redundancy | by Krishnanand A | Medium

View solution in original post

3 REPLIES 3

szymon_dybczak
Esteemed Contributor III

Hi @seefoods ,

You can write decorator function and use it to perform custom logging.

from functools import wraps
from pyspark.sql import SparkSession
from datetime import datetime
import traceback

spark = SparkSession.builder.getOrCreate()

def logger(success_table: str, error_table: str):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                result = func(*args, **kwargs)

                # Success log
                log_df = spark.createDataFrame([{
                    "function_name": func.__name__,
                    "status": "SUCCESS",
                    "timestamp": datetime.now()
                }])
                log_df.write.format("delta").mode("append").saveAsTable(success_table)

                return result
            except Exception as e:
                # Error log
                error_df = spark.createDataFrame([{
                    "function_name": func.__name__,
                    "status": "ERROR",
                    "timestamp": datetime.now(),
                    "error_message": str(e),
                    "stack_trace": traceback.format_exc()
                }])
                error_df.write.format("delta").mode("append").saveAsTable(error_table)

                raise  # Optional: re-raise exception
        return wrapper
    return decorator

#function definition
@logger(success_table="LOG_SUCCESS", error_table="LOG_ERROR")
def load_data(a, b):
    print(a/b)
    return "Done"

#calling the function
load_data(20, 10) #this will log the success message to LOG_SUCCESS
load_data(10, 0) #this will log the exception message to LOG_ERROR

 Check below article for more details:

Custom Logging to Delta Tables in Databricks: Reduce Code Redundancy | by Krishnanand A | Medium

seefoods
Valued Contributor

Thanks a lot @szymon_dybczak 

szymon_dybczak
Esteemed Contributor III

No problem @seefoods  ๐Ÿ™‚