Alberto_Umana
Databricks Employee
Databricks Employee

Hi @noorbasha534,

You can use the StreamingQueryListener interface to capture metrics like the number of input rows, processing time, and batch duration. This can be integrated into your PySpark code to log these metrics in real-time.

Example:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"Query started: {event.id}")

    def onQueryProgress(self, event):
        print(f"Query made progress: {event.progress}")

    def onQueryTerminated(self, event):
        print(f"Query terminated: {event.id}")

spark.streams.addListener(MyListener())

df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .load("s3://your-bucket/path")

query = df.writeStream.format("delta") \
    .option("checkpointLocation", "s3://your-bucket/checkpoints") \
    .start("s3://your-bucket/output")