How to set DLT pipeline warning alert?

zmsoft
Contributor

Hi there,

The example description of custom event hooks in the documentation is not clear enough, I do not know how to implement it inside python functions. event-hooks 

 

My Code: 

%python 
# Read the insertion of data raw_user_delta_streaming=spark.readStream.option("ignoreDeletes", "true").option("ignoreChanges", "true").table("user_delta") 

@dlt.table(comment="The data ingested from kafka topic", table_properties={"pipelines.reset.allowed": "true"} )
def user_delta_streaming(): 
  return raw_user_delta_streaming 

from pyspark.sql.functions import round, avg 
@dlt.view() def user_delta_aggr_view(): 
  return spark.readStream.table("user_delta_streaming").groupBy("gender").agg(round(avg("age_group"), 2).alias("average_age")) 

@dlt.on_event_hook(max_allowable_consecutive_failures=None) 
def user_event_hook(event): 
  # Python code defining the event hook 
  streaming_agg_view_df = spark.readStream.view("user_delta_aggr_view") 
  return streaming_agg_view_df.filter(streaming_agg_view_df["average_age"] > 10) 

import smtplib, ssl from email.mime.text 
import MIMEText from email.mime.multipart 
import MIMEMultipart 
def send_notification_email(): 
  ... 

@on_event_hook 
def my_event_hook(event): 
  print('Received notification that update is stopping: ', event) 
  send_notification_email()

Any suggesstion on user_event_hook function?

Thanks&Regards,

zmsoft

Priyanka_Biswas
Databricks Employee
Databricks Employee

Hi @zmsoft 

The event hook provided, user_event_hook, must be a Python callable that accepts exactly one parameter - a dictionary representation of the event that triggered the execution of this event hook. The return value of the event hook has no significance

Please find an example here -

@on_event_hook
def my_event_hook(event):
   if (
       event["event_type"] == "update_progress"
       and event["details"]["update_progress"]["state"] == "STOPPING"
   ):
       print("Notified that update is stopping: ", event)

Hi @Priyanka_Biswas ,

Thanks for your answer.

What I want to implement is to set a security threshold for a field in a streaming table, and when the custom event detects that new data exceeds the security threshold, it triggers an alert, notifies the user by email, and the user is a non-Databricks user.

Thanks&Regards,

zmsoft

Hi @Priyanka_Biswas ,

 

from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.streaming import DataStreamWriter
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
import time


spark = SparkSession.builder \
    .appName("ThresholdAlert") \
    .getOrCreate()


SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', 'your_password') SMTP_PASSWORD


SMTP_SERVER = 'smtp.example.com'          
SMTP_PORT = 587                          
SMTP_USERNAME = 'your_email@example.com'  
EMAIL_FROM = 'your_email@example.com'     
EMAIL_TO = 'recipient@example.com'         
THRESHOLD = 100                           

def send_alert_email(value):

    subject = f"Warning: xxx {value} xxx {THRESHOLD}"
    body = f"xxxx {value} xxx {THRESHOLD} xxxxx"

    msg = MIMEMultipart()
    msg['Subject'] = subject
    msg['From'] = EMAIL_FROM
    msg['To'] = EMAIL_TO

    msg.attach(MIMEText(body, 'plain'))

    try:
        with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
            server.starttls()
            server.login(SMTP_USERNAME, SMTP_PASSWORD)
            server.send_message(msg)
        print(f"xxxxx: {subject}")
    except Exception as e:
        print(f"xxxx: {e}")

def process_row(row):

    value = row['your_column']  
    if value is not None and value > THRESHOLD:
        send_alert_email(value)

def main():

    schema = T.StructType([
        T.StructField("your_column", T.IntegerType(), True),  
        # xxx...
    ])


    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "your_topic") \
        .load() \
        .selectExpr("CAST(value AS STRING) as json_str")  


    df = df.select(F.from_json(F.col("json_str"), schema).alias("data")).select("data.*")


    df = df.select("your_column")  

    query = df.writeStream \
        .outputMode("append") \
        .foreachBatched(lambda df, epoch_id: [process_row(row) for row in df.collect()]) \
        .start()


    query.awaitTermination()

if __name__ == "__main__":
    main()

I wonder if you know whether the DLT pipeline supports the functions of the above code? How can the functions of the above code be implemented in the DLT Pipeline?

Thanks&Regards,

zmsoft