cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to set DLT pipeline warning alert?

zmsoft
Contributor

Hi there,

The example description of custom event hooks in the documentation is not clear enough, I do not know how to implement it inside python functions. event-hooks 

 

My Code: 

%python 
# Read the insertion of data raw_user_delta_streaming=spark.readStream.option("ignoreDeletes", "true").option("ignoreChanges", "true").table("user_delta") 

@dlt.table(comment="The data ingested from kafka topic", table_properties={"pipelines.reset.allowed": "true"} )
def user_delta_streaming(): 
  return raw_user_delta_streaming 

from pyspark.sql.functions import round, avg 
@dlt.view() def user_delta_aggr_view(): 
  return spark.readStream.table("user_delta_streaming").groupBy("gender").agg(round(avg("age_group"), 2).alias("average_age")) 

@dlt.on_event_hook(max_allowable_consecutive_failures=None) 
def user_event_hook(event): 
  # Python code defining the event hook 
  streaming_agg_view_df = spark.readStream.view("user_delta_aggr_view") 
  return streaming_agg_view_df.filter(streaming_agg_view_df["average_age"] > 10) 

import smtplib, ssl from email.mime.text 
import MIMEText from email.mime.multipart 
import MIMEMultipart 
def send_notification_email(): 
  ... 

@on_event_hook 
def my_event_hook(event): 
  print('Received notification that update is stopping: ', event) 
  send_notification_email()

Any suggesstion on user_event_hook function?

Thanks&Regards,

zmsoft

3 REPLIES 3

Priyanka_Biswas
Databricks Employee
Databricks Employee

Hi @zmsoft 

The event hook provided, user_event_hook, must be a Python callable that accepts exactly one parameter - a dictionary representation of the event that triggered the execution of this event hook. The return value of the event hook has no significance

Please find an example here -

@on_event_hook
def my_event_hook(event):
   if (
       event["event_type"] == "update_progress"
       and event["details"]["update_progress"]["state"] == "STOPPING"
   ):
       print("Notified that update is stopping: ", event)

Hi @Priyanka_Biswas ,

Thanks for your answer.

What I want to implement is to set a security threshold for a field in a streaming table, and when the custom event detects that new data exceeds the security threshold, it triggers an alert, notifies the user by email, and the user is a non-Databricks user.

Thanks&Regards,

zmsoft

Hi @Priyanka_Biswas ,

 

from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.streaming import DataStreamWriter
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
import time


spark = SparkSession.builder \
    .appName("ThresholdAlert") \
    .getOrCreate()


SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', 'your_password') SMTP_PASSWORD


SMTP_SERVER = 'smtp.example.com'          
SMTP_PORT = 587                          
SMTP_USERNAME = 'your_email@example.com'  
EMAIL_FROM = 'your_email@example.com'     
EMAIL_TO = 'recipient@example.com'         
THRESHOLD = 100                           

def send_alert_email(value):

    subject = f"Warning: xxx {value} xxx {THRESHOLD}"
    body = f"xxxx {value} xxx {THRESHOLD} xxxxx"

    msg = MIMEMultipart()
    msg['Subject'] = subject
    msg['From'] = EMAIL_FROM
    msg['To'] = EMAIL_TO

    msg.attach(MIMEText(body, 'plain'))

    try:
        with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
            server.starttls()
            server.login(SMTP_USERNAME, SMTP_PASSWORD)
            server.send_message(msg)
        print(f"xxxxx: {subject}")
    except Exception as e:
        print(f"xxxx: {e}")

def process_row(row):

    value = row['your_column']  
    if value is not None and value > THRESHOLD:
        send_alert_email(value)

def main():

    schema = T.StructType([
        T.StructField("your_column", T.IntegerType(), True),  
        # xxx...
    ])


    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "your_topic") \
        .load() \
        .selectExpr("CAST(value AS STRING) as json_str")  


    df = df.select(F.from_json(F.col("json_str"), schema).alias("data")).select("data.*")


    df = df.select("your_column")  

    query = df.writeStream \
        .outputMode("append") \
        .foreachBatched(lambda df, epoch_id: [process_row(row) for row in df.collect()]) \
        .start()


    query.awaitTermination()

if __name__ == "__main__":
    main()

I wonder if you know whether the DLT pipeline supports the functions of the above code? How can the functions of the above code be implemented in the DLT Pipeline?

Thanks&Regards,

zmsoft

 

 

 

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now