cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to set DLT pipeline warning alert?

zmsoft
Contributor

Hi there,

The example description of custom event hooks in the documentation is not clear enough, I do not know how to implement it inside python functions. event-hooks 

 

My Code: 

%python 
# Read the insertion of data raw_user_delta_streaming=spark.readStream.option("ignoreDeletes", "true").option("ignoreChanges", "true").table("user_delta") 

@dlt.table(comment="The data ingested from kafka topic", table_properties={"pipelines.reset.allowed": "true"} )
def user_delta_streaming(): 
  return raw_user_delta_streaming 

from pyspark.sql.functions import round, avg 
@dlt.view() def user_delta_aggr_view(): 
  return spark.readStream.table("user_delta_streaming").groupBy("gender").agg(round(avg("age_group"), 2).alias("average_age")) 

@dlt.on_event_hook(max_allowable_consecutive_failures=None) 
def user_event_hook(event): 
  # Python code defining the event hook 
  streaming_agg_view_df = spark.readStream.view("user_delta_aggr_view") 
  return streaming_agg_view_df.filter(streaming_agg_view_df["average_age"] > 10) 

import smtplib, ssl from email.mime.text 
import MIMEText from email.mime.multipart 
import MIMEMultipart 
def send_notification_email(): 
  ... 

@on_event_hook 
def my_event_hook(event): 
  print('Received notification that update is stopping: ', event) 
  send_notification_email()

Any suggesstion on user_event_hook function?

Thanks&Regards,

zmsoft

2 REPLIES 2

Priyanka_Biswas
Databricks Employee
Databricks Employee

Hi @zmsoft 

The event hook provided, user_event_hook, must be a Python callable that accepts exactly one parameter - a dictionary representation of the event that triggered the execution of this event hook. The return value of the event hook has no significance

Please find an example here -

@on_event_hook
def my_event_hook(event):
   if (
       event["event_type"] == "update_progress"
       and event["details"]["update_progress"]["state"] == "STOPPING"
   ):
       print("Notified that update is stopping: ", event)

Hi @Priyanka_Biswas ,

Thanks for your answer.

What I want to implement is to set a security threshold for a field in a streaming table, and when the custom event detects that new data exceeds the security threshold, it triggers an alert, notifies the user by email, and the user is a non-Databricks user.

Thanks&Regards,

zmsoft

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now