cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader streaming table - how to determine if new rows were updated from query?

lprevost
Contributor II

If I'm running a scheduled batch Autoloader query which read from csv files on S3 and incrementally loads a delta table, how can I determine if new rows were added?  

I'm currently trying to do this from the streaming query.lastProgress as follows.  self._q is the query established elsewhere by the class.:

def new_data_appended(self):
        if hasattr(self, "_q"):
            self._q.awaitTermination()
            lastProgress = self._q.lastProgress
            sink = lastProgress.get("sink", None)
            if sink:
                numOutputRows = sink.get("numOutputRows", None)
            else:
                return None
            if numOutputRows:
                print(f"numOutputRows: {numOutputRows}")
                if numOutputRows == -1:
                    return False
                else:
                    return True
            else:
                return None
        else:
            raise ValueError("Query status (_q) is not available in self.")

But, I'm finding numOutputRows seems to always be -1 whether rows were updated or not?    I guess I could create a deltaTables object and look at the last history record.  but, seems like this would be available in the query.

I "think" my streaming operation is not stateful meaning that this really isn't a real time stream.   My files change every few weeks but I'm using Auttoloader to simplify tracking what is new and incrementally loading.  So, no real time batch windows or watermarking, etc.

2 ACCEPTED SOLUTIONS

Accepted Solutions

cgrant
Databricks Employee
Databricks Employee

If you're using foreachBatch, it is possible that the numOutputRows will display 0, for a myriad of reasons. The easiest approach is to view the last entry in the Delta table you are inserting into.

DeltaTable.forName(spark, table_name).history(1).collect() # ...

If this isn't suitable, you can use the Observe API, which will collect metrics from your dataframe and put them into lastProgress.observedMetrics. More to read here

View solution in original post

lprevost
Contributor II
2 REPLIES 2

cgrant
Databricks Employee
Databricks Employee

If you're using foreachBatch, it is possible that the numOutputRows will display 0, for a myriad of reasons. The easiest approach is to view the last entry in the Delta table you are inserting into.

DeltaTable.forName(spark, table_name).history(1).collect() # ...

If this isn't suitable, you can use the Observe API, which will collect metrics from your dataframe and put them into lastProgress.observedMetrics. More to read here

lprevost
Contributor II

Thank you!

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now