cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

apply_changes_from_snapshot with expectations

jdlogos
New Contributor II

Hi,

Question: Are expectations supposed to function in conjunction with create_streaming_table() and apply_changes_from_snapshot?

Our team is investigating Delta Live Tables and we have a working prototype using Autoloader to ingest some files from a mysql db into a bronze table and then loading them with a DLT pipeline into a silver table.

However, I can't seem to get expectations to work when using dlt.create_streaming_table() in conjunction with apply_changes_from_snapshot. According to this databricks doc, it appears that expectations are a parameter for dlt.create_streaming_table(). Whenever attempting to use expect_all, expect_all_or_drop, or expect_all_or_fail, the expected behavior does not occur.  The pipeline does not error, warn, or drop rows and completes. However, the pipeline UI for the table's Data Quality tab in the right hand info panel is grayed out and when hovering says "This dataset has no data quality to show".

I have verified that the field in question in the bronze table is in fact null for most of the rows.

Any input would be greatly appreciated.
Thanks

Some code:
def get_next_snapshot(last_snapshot_time):
   # some code to get the next snapshot by:
   #    Uses the oldest bronze table snapshot_time when last_snapshot_time is None
   #    OR Looks up the snapshot_time from the bronze table that immediately follows the last_snapshot_time
   # Uses the next snapshot time to filter the bronze rows within a cleanse function and return that as a tuple

def cleanse(next_snaphsot_time):
   return (
      spark
     .read
     .table(source_bronze_table)
     .select (
      # selected all the columns for now
         col("*")
      )
     .drop(*exclude_columns)
     .filter(col("snapshot_time") == next_snaphsot_time)
)

# important bit, Expectations seems like they should work here:
dlt.create_streaming_table(
   name=silver_table_name,
   table_properties={"quality": "silver"},
   comment=f"Silver table for {source_bronze_table} with incremental processing",
   expect_all={"token_not_null": "token IS NOT NULL"} # trying to get it to work for just one simple expectation
   # schema= Want to inject schemas here
)

dlt.apply_changes_from_snapshot(
   target=silver_table_name,
   snapshot_and_version=get_next_snapshot, #returns None or a tuple for the data frame and snapshot version
   keys=[key_column],
   stored_as_scd_type=2
)

2 REPLIES 2

Stefan-Koch
Valued Contributor II

Hi @jdlogos 

Were you able to find a solution? I work on the same issue and would love to hear if you found a solution.

jdlogos
New Contributor II

Hi Stefan-Koch,

We reached out to our account rep and was instructed to create an Azure support ticket since we do not yet have a paid support plan.  We are hoping to negotiate for paid support.  However, I do not believe the documentation surrounding the Apply Changes feature is not robust enough to describe the constraints and dependencies.  The whole point of the create_streaming_table is to guarantee that the validation stage won't fail.  My approach is to build configuration driven DLT pipelines given our limited staff and time rather than hand-crafting hundreds of DLT pipeline notebooks.  In order to get that to work, I need to inject the schema and expectations from an external definition.  The create_streaming_function's parameter-list does support schema and expectations according to the docs, but the feature doesn't seem to work at all. The pipeline succeeds, but the Data Quality tab indicates there are no data quality definitions defined.  

When I learn something I will definitely post the answer here.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now