cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

apply_changes_from_snapshot with expectations

jdlogos
New Contributor

Hi,

Question: Are expectations supposed to function in conjunction with create_streaming_table() and apply_changes_from_snapshot?

Our team is investigating Delta Live Tables and we have a working prototype using Autoloader to ingest some files from a mysql db into a bronze table and then loading them with a DLT pipeline into a silver table.

However, I can't seem to get expectations to work when using dlt.create_streaming_table() in conjunction with apply_changes_from_snapshot. According to this databricks doc, it appears that expectations are a parameter for dlt.create_streaming_table(). Whenever attempting to use expect_all, expect_all_or_drop, or expect_all_or_fail, the expected behavior does not occur.  The pipeline does not error, warn, or drop rows and completes. However, the pipeline UI for the table's Data Quality tab in the right hand info panel is grayed out and when hovering says "This dataset has no data quality to show".

I have verified that the field in question in the bronze table is in fact null for most of the rows.

Any input would be greatly appreciated.
Thanks

Some code:
def get_next_snapshot(last_snapshot_time):
   # some code to get the next snapshot by:
   #    Uses the oldest bronze table snapshot_time when last_snapshot_time is None
   #    OR Looks up the snapshot_time from the bronze table that immediately follows the last_snapshot_time
   # Uses the next snapshot time to filter the bronze rows within a cleanse function and return that as a tuple

def cleanse(next_snaphsot_time):
   return (
      spark
     .read
     .table(source_bronze_table)
     .select (
      # selected all the columns for now
         col("*")
      )
     .drop(*exclude_columns)
     .filter(col("snapshot_time") == next_snaphsot_time)
)

# important bit, Expectations seems like they should work here:
dlt.create_streaming_table(
   name=silver_table_name,
   table_properties={"quality": "silver"},
   comment=f"Silver table for {source_bronze_table} with incremental processing",
   expect_all={"token_not_null": "token IS NOT NULL"} # trying to get it to work for just one simple expectation
   # schema= Want to inject schemas here
)

dlt.apply_changes_from_snapshot(
   target=silver_table_name,
   snapshot_and_version=get_next_snapshot, #returns None or a tuple for the data frame and snapshot version
   keys=[key_column],
   stored_as_scd_type=2
)

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group