cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

how to use rules dynamically in LDP

IM_01
Contributor

Hi

I see there is a way to store rules in table & use them in python while implementing LDPs how to use the generate/ read rules dynamically in SQL way of implementing LDPs. Could you please help me with this

#DLT

4 REPLIES 4

szymon_dybczak
Esteemed Contributor III

Hi @IM_01 ,

Yes, you can store your rules in a delta table. For the sake of example this table is filled with static rules, but of course you can insert to that table in "dynamic" way.

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

 Then you can read those rules in following way:

from pyspark import pipelines as dp
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  df = spark.read.table("rules").filter(col("tag") == tag).collect()
  return {
      row['name']: row['constraint']
      for row in df
  }

@dp.table
@dp.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dp.table
@dp.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
  return (
    spark.read.table("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
  )

Check following page for more details:

Expectation recommendations and advanced patterns | Databricks on AWS

IM_01
Contributor

HI @szymon_dybczak 
Is there a way that we can implement expect all or dynamic rules using sql in LDP instead of python 

szymon_dybczak
Esteemed Contributor III

Hi @IM_01 ,

To be honest I don't know. I've always used python API when working with declarative pipelines ๐Ÿ™‚

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @IM_01,

The feature you are looking for, storing data quality rules in a table and applying them dynamically, is fully supported in Lakeflow Spark Declarative Pipelines (SDP) through the Python API. Unfortunately, there is currently no equivalent SQL-only mechanism for loading expectations dynamically from a table. The SQL CONSTRAINT ... EXPECT syntax requires rules to be defined statically inline in the table definition.

That said, here is the recommended approach using Python, and a hybrid workaround if you prefer to keep your transformation logic in SQL.


OPTION 1: FULLY DYNAMIC RULES WITH PYTHON

Step 1. Create a rules table that stores your expectations:

CREATE OR REPLACE TABLE rules AS
SELECT col1 AS name, col2 AS constraint, col3 AS tag
FROM (
VALUES
("website_not_null", "Website IS NOT NULL", "validity"),
("fresh_data", "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'", "maintained"),
("social_media_access", "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)", "maintained")
)

Step 2. Create a helper function in your pipeline notebook that reads rules from the table:

from pyspark import pipelines as dp
from pyspark.sql.functions import expr, col

def get_rules(tag):
"""Load data quality rules from a Delta table filtered by tag."""
df = spark.read.table("rules").filter(col("tag") == tag).collect()
return {row['name']: row['constraint'] for row in df}

Step 3. Apply the rules dynamically using expect_all, expect_all_or_drop, or expect_all_or_fail:

@DP.table
@DP.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
return (
spark.read.format('csv')
.option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)

@DP.table
@DP.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
return (
spark.read.table("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
)

This way, you can add, update, or remove rules by simply modifying the rules table, with no changes needed to the pipeline code itself.


OPTION 2: HYBRID APPROACH (SQL TRANSFORMATIONS WITH PYTHON EXPECTATIONS)

If you prefer writing your transformation logic in SQL but still want dynamic expectations, you can use a hybrid pattern. Define your transformation as a SQL temporary view, then wrap it in a Python table definition where you apply dynamic expectations:

CREATE TEMPORARY VIEW raw_farmers_market_sql AS
SELECT * FROM csv.`/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/`

Then in a Python cell in the same pipeline notebook:

@DP.table
@DP.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
return spark.read.table("raw_farmers_market_sql")

This gives you the flexibility of SQL for data transformations while leveraging Python for dynamic rule application.


OPTION 3: PYTHON MODULE FOR RULES (ALTERNATIVE TO TABLE)

If you prefer storing rules in code rather than a table, you can create a Python module with your rules and import them:

# rules_module.py
def get_rules_as_list_of_dict():
return [
{"name": "website_not_null", "constraint": "Website IS NOT NULL", "tag": "validity"},
{"name": "fresh_data", "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'", "tag": "maintained"},
]

Then in your pipeline:

from rules_module import get_rules_as_list_of_dict

def get_rules(tag):
return {
row['name']: row['constraint']
for row in get_rules_as_list_of_dict()
if row['tag'] == tag
}


WHY SQL DOES NOT SUPPORT DYNAMIC EXPECTATIONS

In the SQL syntax for Lakeflow Spark Declarative Pipelines, expectations are defined as static CONSTRAINT clauses:

CREATE OR REFRESH STREAMING TABLE my_table (
CONSTRAINT valid_age EXPECT (age BETWEEN 0 AND 120) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(source_table)

There is no SQL syntax to reference or load constraints from another table at runtime. This is a limitation of the declarative SQL approach. The Python API, with its expect_all family of decorators that accept dictionaries, is the supported path for dynamic rule management.


DOCUMENTATION REFERENCES

For full details, see the official docs:
https://docs.databricks.com/en/ldp/expectation-patterns.html
https://docs.databricks.com/en/dlt/expectations.html

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.