How to set DLT pipeline warning alert?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Hi there,
The example description of custom event hooks in the documentation is not clear enough, I do not know how to implement it inside python functions. event-hooks
My Code:
%python
# Read the insertion of data raw_user_delta_streaming=spark.readStream.option("ignoreDeletes", "true").option("ignoreChanges", "true").table("user_delta")
@dlt.table(comment="The data ingested from kafka topic", table_properties={"pipelines.reset.allowed": "true"} )
def user_delta_streaming():
return raw_user_delta_streaming
from pyspark.sql.functions import round, avg
@dlt.view() def user_delta_aggr_view():
return spark.readStream.table("user_delta_streaming").groupBy("gender").agg(round(avg("age_group"), 2).alias("average_age"))
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
streaming_agg_view_df = spark.readStream.view("user_delta_aggr_view")
return streaming_agg_view_df.filter(streaming_agg_view_df["average_age"] > 10)
import smtplib, ssl from email.mime.text
import MIMEText from email.mime.multipart
import MIMEMultipart
def send_notification_email():
...
@on_event_hook
def my_event_hook(event):
print('Received notification that update is stopping: ', event)
send_notification_email()
Any suggesstion on user_event_hook function?
Thanks&Regards,
zmsoft
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Hi @zmsoft
The event hook provided, user_event_hook, must be a Python callable that accepts exactly one parameter - a dictionary representation of the event that triggered the execution of this event hook. The return value of the event hook has no significance.
Please find an example here -
@on_event_hook
def my_event_hook(event):
if (
event["event_type"] == "update_progress"
and event["details"]["update_progress"]["state"] == "STOPPING"
):
print("Notified that update is stopping: ", event)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 weeks ago
Hi @Priyanka_Biswas ,
Thanks for your answer.
What I want to implement is to set a security threshold for a field in a streaming table, and when the custom event detects that new data exceeds the security threshold, it triggers an alert, notifies the user by email, and the user is a non-Databricks user.
Thanks&Regards,
zmsoft

