Hi,
Question: Are expectations supposed to function in conjunction with create_streaming_table() and apply_changes_from_snapshot?
Our team is investigating Delta Live Tables and we have a working prototype using Autoloader to ingest some files from a mysql db into a bronze table and then loading them with a DLT pipeline into a silver table.
However, I can't seem to get expectations to work when using dlt.create_streaming_table() in conjunction with apply_changes_from_snapshot. According to this databricks doc, it appears that expectations are a parameter for dlt.create_streaming_table(). Whenever attempting to use expect_all, expect_all_or_drop, or expect_all_or_fail, the expected behavior does not occur. The pipeline does not error, warn, or drop rows and completes. However, the pipeline UI for the table's Data Quality tab in the right hand info panel is grayed out and when hovering says "This dataset has no data quality to show".
I have verified that the field in question in the bronze table is in fact null for most of the rows.
Any input would be greatly appreciated.
Thanks
Some code:
def get_next_snapshot(last_snapshot_time):
# some code to get the next snapshot by:
# Uses the oldest bronze table snapshot_time when last_snapshot_time is None
# OR Looks up the snapshot_time from the bronze table that immediately follows the last_snapshot_time
# Uses the next snapshot time to filter the bronze rows within a cleanse function and return that as a tuple
def cleanse(next_snaphsot_time):
return (
spark
.read
.table(source_bronze_table)
.select (
# selected all the columns for now
col("*")
)
.drop(*exclude_columns)
.filter(col("snapshot_time") == next_snaphsot_time)
)
# important bit, Expectations seems like they should work here:
dlt.create_streaming_table(
name=silver_table_name,
table_properties={"quality": "silver"},
comment=f"Silver table for {source_bronze_table} with incremental processing",
expect_all={"token_not_null": "token IS NOT NULL"} # trying to get it to work for just one simple expectation
# schema= Want to inject schemas here
)
dlt.apply_changes_from_snapshot(
target=silver_table_name,
snapshot_and_version=get_next_snapshot, #returns None or a tuple for the data frame and snapshot version
keys=[key_column],
stored_as_scd_type=2
)