Hi @Priyanka_Biswas ,
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.streaming import DataStreamWriter
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
import time
spark = SparkSession.builder \
.appName("ThresholdAlert") \
.getOrCreate()
SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', 'your_password') SMTP_PASSWORD
SMTP_SERVER = 'smtp.example.com'
SMTP_PORT = 587
SMTP_USERNAME = 'your_email@example.com'
EMAIL_FROM = 'your_email@example.com'
EMAIL_TO = 'recipient@example.com'
THRESHOLD = 100
def send_alert_email(value):
subject = f"Warning: xxx {value} xxx {THRESHOLD}"
body = f"xxxx {value} xxx {THRESHOLD} xxxxx"
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = EMAIL_FROM
msg['To'] = EMAIL_TO
msg.attach(MIMEText(body, 'plain'))
try:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USERNAME, SMTP_PASSWORD)
server.send_message(msg)
print(f"xxxxx: {subject}")
except Exception as e:
print(f"xxxx: {e}")
def process_row(row):
value = row['your_column']
if value is not None and value > THRESHOLD:
send_alert_email(value)
def main():
schema = T.StructType([
T.StructField("your_column", T.IntegerType(), True),
# xxx...
])
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "your_topic") \
.load() \
.selectExpr("CAST(value AS STRING) as json_str")
df = df.select(F.from_json(F.col("json_str"), schema).alias("data")).select("data.*")
df = df.select("your_column")
query = df.writeStream \
.outputMode("append") \
.foreachBatched(lambda df, epoch_id: [process_row(row) for row in df.collect()]) \
.start()
query.awaitTermination()
if __name__ == "__main__":
main()
I wonder if you know whether the DLT pipeline supports the functions of the above code? How can the functions of the above code be implemented in the DLT Pipeline?
Thanks&Regards,
zmsoft