I have a requirement, where I need to apply inverse DQ rule on a table to track the invalid data. For which I can use the following approach:
import dlt
rules = {}
quarantine_rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
# concatenate inverse rules
quarantine_rules["invalid_record"] = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="valid_farmers_market"
)
@dlt.expect_all_or_drop(rules)
def get_valid_farmers_market():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
)
@dlt.table(
name="invalid_farmers_market"
)
@dlt.expect_all_or_drop(quarantine_rules)
def get_invalid_farmers_market():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
)
However, when I store the invalid data in another table i.e., invalid_farmers_market. It will add all the rows which is invalid, but I am trying to apply following 2 rules.
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
I want to know is there is any way how I can understand the specific row is in invalid table because of which specific rule. Either by rules["valid_website"] or rules["valid_location"] or both. So that I can take appropriate action for the specific column.