Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-01-2025 06:37 PM
Hi @noorbasha534,
You can use the StreamingQueryListener interface to capture metrics like the number of input rows, processing time, and batch duration. This can be integrated into your PySpark code to log these metrics in real-time.
Example:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"Query started: {event.id}")
def onQueryProgress(self, event):
print(f"Query made progress: {event.progress}")
def onQueryTerminated(self, event):
print(f"Query terminated: {event.id}")
spark.streams.addListener(MyListener())
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.load("s3://your-bucket/path")
query = df.writeStream.format("delta") \
.option("checkpointLocation", "s3://your-bucket/checkpoints") \
.start("s3://your-bucket/output")