- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-26-2025 09:56 AM
If I'm running a scheduled batch Autoloader query which read from csv files on S3 and incrementally loads a delta table, how can I determine if new rows were added?
I'm currently trying to do this from the streaming query.lastProgress as follows. self._q is the query established elsewhere by the class.:
def new_data_appended(self):
if hasattr(self, "_q"):
self._q.awaitTermination()
lastProgress = self._q.lastProgress
sink = lastProgress.get("sink", None)
if sink:
numOutputRows = sink.get("numOutputRows", None)
else:
return None
if numOutputRows:
print(f"numOutputRows: {numOutputRows}")
if numOutputRows == -1:
return False
else:
return True
else:
return None
else:
raise ValueError("Query status (_q) is not available in self.")
But, I'm finding numOutputRows seems to always be -1 whether rows were updated or not? I guess I could create a deltaTables object and look at the last history record. but, seems like this would be available in the query.
I "think" my streaming operation is not stateful meaning that this really isn't a real time stream. My files change every few weeks but I'm using Auttoloader to simplify tracking what is new and incrementally loading. So, no real time batch windows or watermarking, etc.
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-27-2025 09:45 AM
If you're using foreachBatch, it is possible that the numOutputRows will display 0, for a myriad of reasons. The easiest approach is to view the last entry in the Delta table you are inserting into.
DeltaTable.forName(spark, table_name).history(1).collect() # ...
If this isn't suitable, you can use the Observe API, which will collect metrics from your dataframe and put them into lastProgress.observedMetrics. More to read here
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
a month ago
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-27-2025 09:45 AM
If you're using foreachBatch, it is possible that the numOutputRows will display 0, for a myriad of reasons. The easiest approach is to view the last entry in the Delta table you are inserting into.
DeltaTable.forName(spark, table_name).history(1).collect() # ...
If this isn't suitable, you can use the Observe API, which will collect metrics from your dataframe and put them into lastProgress.observedMetrics. More to read here
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
a month ago
Thank you!

