โ04-02-2025 03:48 PM
Hi there,
The example description of custom event hooks in the documentation is not clear enough, I do not know how to implement it inside python functions. event-hooks
My Code:
%python
# Read the insertion of data raw_user_delta_streaming=spark.readStream.option("ignoreDeletes", "true").option("ignoreChanges", "true").table("user_delta")
@dlt.table(comment="The data ingested from kafka topic", table_properties={"pipelines.reset.allowed": "true"} )
def user_delta_streaming():
return raw_user_delta_streaming
from pyspark.sql.functions import round, avg
@dlt.view() def user_delta_aggr_view():
return spark.readStream.table("user_delta_streaming").groupBy("gender").agg(round(avg("age_group"), 2).alias("average_age"))
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
streaming_agg_view_df = spark.readStream.view("user_delta_aggr_view")
return streaming_agg_view_df.filter(streaming_agg_view_df["average_age"] > 10)
import smtplib, ssl from email.mime.text
import MIMEText from email.mime.multipart
import MIMEMultipart
def send_notification_email():
...
@on_event_hook
def my_event_hook(event):
print('Received notification that update is stopping: ', event)
send_notification_email()
Any suggesstion on user_event_hook function?
Thanks&Regards,
zmsoft
โ04-03-2025 02:33 PM
Hi @zmsoft
The event hook provided, user_event_hook, must be a Python callable that accepts exactly one parameter - a dictionary representation of the event that triggered the execution of this event hook. The return value of the event hook has no significance.
Please find an example here -
@on_event_hook
def my_event_hook(event):
if (
event["event_type"] == "update_progress"
and event["details"]["update_progress"]["state"] == "STOPPING"
):
print("Notified that update is stopping: ", event)
โ04-07-2025 12:59 AM
Hi @Priyanka_Biswas ,
Thanks for your answer.
What I want to implement is to set a security threshold for a field in a streaming table, and when the custom event detects that new data exceeds the security threshold, it triggers an alert, notifies the user by email, and the user is a non-Databricks user.
Thanks&Regards,
zmsoft
โ04-23-2025 11:15 PM
Hi @Priyanka_Biswas ,
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.streaming import DataStreamWriter
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
import time
spark = SparkSession.builder \
.appName("ThresholdAlert") \
.getOrCreate()
SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', 'your_password') SMTP_PASSWORD
SMTP_SERVER = 'smtp.example.com'
SMTP_PORT = 587
SMTP_USERNAME = 'your_email@example.com'
EMAIL_FROM = 'your_email@example.com'
EMAIL_TO = 'recipient@example.com'
THRESHOLD = 100
def send_alert_email(value):
subject = f"Warning: xxx {value} xxx {THRESHOLD}"
body = f"xxxx {value} xxx {THRESHOLD} xxxxx"
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = EMAIL_FROM
msg['To'] = EMAIL_TO
msg.attach(MIMEText(body, 'plain'))
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USERNAME, SMTP_PASSWORD)
server.send_message(msg)
print(f"xxxxx: {subject}")
except Exception as e:
print(f"xxxx: {e}")
def process_row(row):
value = row['your_column']
if value is not None and value > THRESHOLD:
send_alert_email(value)
def main():
schema = T.StructType([
T.StructField("your_column", T.IntegerType(), True),
# xxx...
])
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "your_topic") \
.load() \
.selectExpr("CAST(value AS STRING) as json_str")
df = df.select(F.from_json(F.col("json_str"), schema).alias("data")).select("data.*")
df = df.select("your_column")
query = df.writeStream \
.outputMode("append") \
.foreachBatched(lambda df, epoch_id: [process_row(row) for row in df.collect()]) \
.start()
query.awaitTermination()
if __name__ == "__main__":
main()
I wonder if you know whether the DLT pipeline supports the functions of the above code? How can the functions of the above code be implemented in the DLT Pipeline?
Thanks&Regards,
zmsoft
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now