<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic How to set DLT pipeline warning alert? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-set-dlt-pipeline-warning-alert/m-p/114349#M44788</link>
    <description>&lt;P&gt;Hi there,&lt;/P&gt;&lt;P&gt;The example description of custom event hooks in the documentation is not clear enough, I do not know how to implement it inside python functions.&amp;nbsp;&lt;A href="https://learn.microsoft.com/zh-cn/azure/databricks/dlt/event-hooks" target="_self"&gt;event-hooks&lt;/A&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;My Code:&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%python 
# Read the insertion of data raw_user_delta_streaming=spark.readStream.option("ignoreDeletes", "true").option("ignoreChanges", "true").table("user_delta") 

@dlt.table(comment="The data ingested from kafka topic", table_properties={"pipelines.reset.allowed": "true"} )
def user_delta_streaming(): 
  return raw_user_delta_streaming 

from pyspark.sql.functions import round, avg 
@dlt.view() def user_delta_aggr_view(): 
  return spark.readStream.table("user_delta_streaming").groupBy("gender").agg(round(avg("age_group"), 2).alias("average_age")) 

@dlt.on_event_hook(max_allowable_consecutive_failures=None) 
def user_event_hook(event): 
  # Python code defining the event hook 
  streaming_agg_view_df = spark.readStream.view("user_delta_aggr_view") 
  return streaming_agg_view_df.filter(streaming_agg_view_df["average_age"] &amp;gt; 10) 

import smtplib, ssl from email.mime.text 
import MIMEText from email.mime.multipart 
import MIMEMultipart 
def send_notification_email(): 
  ... 

@on_event_hook 
def my_event_hook(event): 
  print('Received notification that update is stopping: ', event) 
  send_notification_email()&lt;/LI-CODE&gt;&lt;P&gt;Any suggesstion on user_event_hook function?&lt;/P&gt;&lt;P&gt;Thanks&amp;amp;Regards,&lt;/P&gt;&lt;P&gt;zmsoft&lt;/P&gt;</description>
    <pubDate>Wed, 02 Apr 2025 22:48:01 GMT</pubDate>
    <dc:creator>zmsoft</dc:creator>
    <dc:date>2025-04-02T22:48:01Z</dc:date>
    <item>
      <title>How to set DLT pipeline warning alert?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-set-dlt-pipeline-warning-alert/m-p/114349#M44788</link>
      <description>&lt;P&gt;Hi there,&lt;/P&gt;&lt;P&gt;The example description of custom event hooks in the documentation is not clear enough, I do not know how to implement it inside python functions.&amp;nbsp;&lt;A href="https://learn.microsoft.com/zh-cn/azure/databricks/dlt/event-hooks" target="_self"&gt;event-hooks&lt;/A&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;My Code:&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;%python 
# Read the insertion of data raw_user_delta_streaming=spark.readStream.option("ignoreDeletes", "true").option("ignoreChanges", "true").table("user_delta") 

@dlt.table(comment="The data ingested from kafka topic", table_properties={"pipelines.reset.allowed": "true"} )
def user_delta_streaming(): 
  return raw_user_delta_streaming 

from pyspark.sql.functions import round, avg 
@dlt.view() def user_delta_aggr_view(): 
  return spark.readStream.table("user_delta_streaming").groupBy("gender").agg(round(avg("age_group"), 2).alias("average_age")) 

@dlt.on_event_hook(max_allowable_consecutive_failures=None) 
def user_event_hook(event): 
  # Python code defining the event hook 
  streaming_agg_view_df = spark.readStream.view("user_delta_aggr_view") 
  return streaming_agg_view_df.filter(streaming_agg_view_df["average_age"] &amp;gt; 10) 

import smtplib, ssl from email.mime.text 
import MIMEText from email.mime.multipart 
import MIMEMultipart 
def send_notification_email(): 
  ... 

@on_event_hook 
def my_event_hook(event): 
  print('Received notification that update is stopping: ', event) 
  send_notification_email()&lt;/LI-CODE&gt;&lt;P&gt;Any suggesstion on user_event_hook function?&lt;/P&gt;&lt;P&gt;Thanks&amp;amp;Regards,&lt;/P&gt;&lt;P&gt;zmsoft&lt;/P&gt;</description>
      <pubDate>Wed, 02 Apr 2025 22:48:01 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-set-dlt-pipeline-warning-alert/m-p/114349#M44788</guid>
      <dc:creator>zmsoft</dc:creator>
      <dc:date>2025-04-02T22:48:01Z</dc:date>
    </item>
    <item>
      <title>Re: How to set DLT pipeline warning alert?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-set-dlt-pipeline-warning-alert/m-p/114461#M44835</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/103629"&gt;@zmsoft&lt;/a&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The event hook provided, &lt;/SPAN&gt;&lt;SPAN&gt;user_event_hook&lt;/SPAN&gt;&lt;SPAN&gt;, must be a Python callable that accepts&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;exactly one parameter - a dictionary representation of the event that triggered the execution of this event hook. &lt;/SPAN&gt;&lt;STRONG&gt;The return value of the event hook has no significance&lt;/STRONG&gt;&lt;SPAN&gt;.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Please find an example here - &lt;/SPAN&gt;&lt;/P&gt;
&lt;LI-CODE lang="markup"&gt;@on_event_hook
def my_event_hook(event):
   if (
       event["event_type"] == "update_progress"
       and event["details"]["update_progress"]["state"] == "STOPPING"
   ):
       print("Notified that update is stopping: ", event)&lt;/LI-CODE&gt;</description>
      <pubDate>Thu, 03 Apr 2025 21:33:03 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-set-dlt-pipeline-warning-alert/m-p/114461#M44835</guid>
      <dc:creator>Priyanka_Biswas</dc:creator>
      <dc:date>2025-04-03T21:33:03Z</dc:date>
    </item>
    <item>
      <title>Re: How to set DLT pipeline warning alert?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-set-dlt-pipeline-warning-alert/m-p/114688#M44909</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/39246"&gt;@Priyanka_Biswas&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;Thanks for your answer.&lt;/P&gt;&lt;P&gt;What I want to implement is to set a security threshold for a field in a streaming table, and when the custom event detects that new data exceeds the security threshold, it triggers an alert, notifies the user by email, and the user is a non-Databricks user.&lt;/P&gt;&lt;P&gt;Thanks&amp;amp;Regards,&lt;/P&gt;&lt;P&gt;zmsoft&lt;/P&gt;</description>
      <pubDate>Mon, 07 Apr 2025 07:59:23 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-set-dlt-pipeline-warning-alert/m-p/114688#M44909</guid>
      <dc:creator>zmsoft</dc:creator>
      <dc:date>2025-04-07T07:59:23Z</dc:date>
    </item>
    <item>
      <title>Re: How to set DLT pipeline warning alert?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-set-dlt-pipeline-warning-alert/m-p/116427#M45312</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/39246"&gt;@Priyanka_Biswas&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.streaming import DataStreamWriter
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
import time


spark = SparkSession.builder \
    .appName("ThresholdAlert") \
    .getOrCreate()


SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', 'your_password') SMTP_PASSWORD


SMTP_SERVER = 'smtp.example.com'          
SMTP_PORT = 587                          
SMTP_USERNAME = 'your_email@example.com'  
EMAIL_FROM = 'your_email@example.com'     
EMAIL_TO = 'recipient@example.com'         
THRESHOLD = 100                           

def send_alert_email(value):

    subject = f"Warning: xxx {value} xxx {THRESHOLD}"
    body = f"xxxx {value} xxx {THRESHOLD} xxxxx"

    msg = MIMEMultipart()
    msg['Subject'] = subject
    msg['From'] = EMAIL_FROM
    msg['To'] = EMAIL_TO

    msg.attach(MIMEText(body, 'plain'))

    try:
        with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
            server.starttls()
            server.login(SMTP_USERNAME, SMTP_PASSWORD)
            server.send_message(msg)
        print(f"xxxxx: {subject}")
    except Exception as e:
        print(f"xxxx: {e}")

def process_row(row):

    value = row['your_column']  
    if value is not None and value &amp;gt; THRESHOLD:
        send_alert_email(value)

def main():

    schema = T.StructType([
        T.StructField("your_column", T.IntegerType(), True),  
        # xxx...
    ])


    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "your_topic") \
        .load() \
        .selectExpr("CAST(value AS STRING) as json_str")  


    df = df.select(F.from_json(F.col("json_str"), schema).alias("data")).select("data.*")


    df = df.select("your_column")  

    query = df.writeStream \
        .outputMode("append") \
        .foreachBatched(lambda df, epoch_id: [process_row(row) for row in df.collect()]) \
        .start()


    query.awaitTermination()

if __name__ == "__main__":
    main()&lt;/LI-CODE&gt;&lt;P&gt;I wonder if you know whether the DLT pipeline supports the functions of the above code? How can the functions of the above code be implemented in the DLT Pipeline?&lt;/P&gt;&lt;P&gt;Thanks&amp;amp;Regards,&lt;/P&gt;&lt;P&gt;zmsoft&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 24 Apr 2025 06:15:14 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-set-dlt-pipeline-warning-alert/m-p/116427#M45312</guid>
      <dc:creator>zmsoft</dc:creator>
      <dc:date>2025-04-24T06:15:14Z</dc:date>
    </item>
  </channel>
</rss>

