Good data quality isn't just a nice-to-have—it's a necessity, especially in regulated industries like Financial Services. Companies in the industry are investing in artificial intelligence to automate processes and decision-making, creating increasingly sophisticated data ecosystems that demand rigorous quality control.
The success of these AI initiatives relies on high-quality data. End-users know poor data quality when they see it, but to be successful with data and AI, teams must go beyond anecdotes and have quantitative measurements. As the old adage goes, "You can't manage what you don't measure".
Databricks' Data Intelligence Platform provides a unified solution for data and AI with robust data engineering capabilities for building and monitoring data pipelines. The platform’s DLT framework lets users create batch and streaming data pipelines in SQL and Python. This includes giving users the ability to apply quality constraints, known as ‘expectations’, to capture data quality metrics and automatically take action when issues are found.
Measuring what matters requires having an effective data quality framework in mind before starting to implement expectations. We’ll share some of the best practices we’ve seen so that you can feel confident in the quality of the data feeding your downstream applications and consumers.
Anyone who has worked with data pipelines has seen it: data coming from upstream sources changes in unexpected ways without warning, and they don’t find out about it until someone downstream raises an issue. The upstream changes might be subtle—new attributes being collected, new values being allowed in existing fields, or schema changes—but the impact on business processes or data consumers’ trust can be significant.
Possibly the worst part is that data pipelines often continue to execute normally despite their polluted nature. ETL jobs complete successfully, dashboards update on schedule, and everything ‘looks green’.
Best case: a data consumer with a strong intuition of the data catches the error and it’s addressed before it can impact the customer experience. Worst case: the issue lingers for weeks and requires a multi-team investigation to identify & resolve the root cause. Adding insult to injury, when issues like this happen in highly regulated industries like Financial Services, they often require detailed root cause analyses to be completed and can trigger regulatory reviews.
It’s not always painfully obvious when poor-quality data has infiltrated a pipeline. A good data quality framework recognizes this and should be designed such that:
Expectations provide the tooling necessary to apply this nuance (and more!) to pipeline monitoring so that issues are handled appropriately. Using standard SQL Boolean statements, expectations apply data quality checks on each record passing through a query.
Databricks couples this with tooling to monitor and understand data quality issues across your entire pipeline. The first piece of this tooling is a set of pipeline metrics, available within the DLT UI, that show the number of expectations applied to each table or view and passed/failed records.
Each DLT Pipeline saves events and expectations metrics in the Storage Location defined on the pipeline. The event log table shows the name of each expectation and the number of issues it identified, enabling us to get granular information about changes to data quality over time. Refer to Databricks’s documentation for additional details about how to access this table and the full list of attributes available.
We can make this information even more broadly available and further simplify & centralize data quality monitoring by creating an AI/BI dashboard using these event logs.
But in order to get these insights about our pipelines, we first need to set-up expectations within DLT. Let’s look at some specific examples across a variety of data quality issues:
When new data is seen in a pipeline that doesn’t follow a common format or structure (e.g., custom country codes), it’s often an early indicator of upstream issues that need to be addressed. Diagnosing the root cause can be time-consuming, so as a first order of business, this data shouldn’t be allowed to flow downstream. As a simple example, if we acquire lists of prospective clients for an email marketing campaign, we should drop any record without a valid email address.
CREATE STREAMING TABLE email_prospects (
CONSTRAINT `Emails must contain @` EXPECT (email_address LIKE '%@%') ON VIOLATION DROP ROW
)
COMMENT "Livestream of new prospects for email campaign"
AS SELECT * from STREAM(live.new_prospects)
Invalid data by another name, incomplete data is particularly common when upstream systems or transformations have built-in truncation logic. Continuing with our email marketing example, we might find that a value has an “@” but stops there. One of the great things about the expectations feature is that we can simply add this constraint to our previous query.
CREATE STREAMING TABLE email_prospects (
CONSTRAINT `Emails must contain @` EXPECT (email_address LIKE '%@%') ON VIOLATION DROP ROW,
CONSTRAINT `Emails cannot end with @` EXPECT (SUBSTR(email_address, -1) != '@') ON VIOLATION DROP ROW
)
COMMENT "Livestream of new prospects for email campaign"
AS SELECT * REPLACE(rtrim(email_address) AS email_address) from STREAM(live.new_prospects)
The difference between missing and non-existent data can be difficult to decipher without business context. Consulting with downstream consumers is a good idea when implementing most data quality rules, but especially when monitoring for missing data. A marketer putting together a personalized campaign will likely say that they can’t do anything with prospect data that doesn’t contain any contact information. We can use expectations to remove this data and only provide actionable contacts.
CREATE OR REFRESH MATERIALIZED VIEW marketing_prospects (
CONSTRAINT `Records must contain at least 1 piece of contact information` EXPECT (email_address IS NOT NULL or full_address IS NOT NULL or phone_number IS NOT NULL) ON VIOLATION DROP ROW
)
COMMENT "Marketing prospects for personalized marketing campaign"
AS SELECT * from STREAM(live.marketing_prospects)
Keep in mind, when you define a Databricks SQL materialized view directly over a streaming live table that has an EXPECT constraint, the system can’t use Change Data Feed to compute “what’s new” vs. “what’s already been processed.” As a result, every REFRESH will re‑scan and re‑process the entire source, not just the newest data. Alternatively, you can avoid this by applying your EXPECT checks upstream and writing the validated data into a static Delta table.
Data consumers are often great at spotting data quality issues because they have an intuition for the range of acceptable values through a mix of business and historical context. Luckily, we can codify some of this context by comparing values to historical statistical ranges and flagging those that fall outside a range. For example, if we capture the net worth of new clients during onboarding, we can set an expectation to flag when we see an outlier value, which could signal that net worth was provided in the wrong currency.
CREATE OR REFRESH MATERIALIZED VIEW net_worth_validation AS
WITH bounds AS (
SELECT
0 as lower_bound,
avg(net_worth) + 4 * stddev(net_worth) as upper_bound
FROM historical_stats
WHERE date >= CURRENT_DATE() - INTERVAL 180 DAYS
)
SELECT
new_clients.*,
bounds.*
FROM new_clients
CROSS JOIN bounds;
CREATE OR REFRESH MATERIALIZED VIEW validated_net_worth (
CONSTRAINT 'Net worth must fall within historic range' EXPECT (amount BETWEEN lower_bound AND upper_bound)
)
AS SELECT * FROM net_worth_validation;
If we’ve designed our pipeline to only ingest new, incremental values then we typically don’t expect to see values created that are months or years old. However, that’s not always the case. Transactions can be restated, data from legacy systems can be added, and one-off files can be added to enrich existing data. Data that seems stale on the surface can actually be legitimate and valuable. That shouldn’t stop us from monitoring when this happens so that we can verify that there isn’t actually an issue, such as old data being inadvertently replicated.
CREATE STREAMING TABLE client_transactions (
CONSTRAINT `Warn if transaction is more than 90 days old` EXPECT (timestamp >= CURRENT_DATE() - INTERVAL 90 DAYS
)
COMMENT "Cleared and validated client transactions"
AS SELECT * from STREAM(live.client_transactions)
Perhaps the most challenging data quality issue of them all, inaccurate data can be created for reasons unrelated to any broader issues. A customer service agent may mishear a client on the phone and enter the wrong phone number, a client can enter a typo on a web portal, or a client can change their address and forget to notify the bank.
Unfortunately, there’s no ‘silver bullet’ for these types of issues. Instead, we must listen for issues happening downstream and continuously evolve our expectations. We can do this by creating a Delta Table that contains a set of rules used to tag suspect values. The benefits are that we can apply these rules retroactively and across many datasets, so we can review our entire corpus as we discover new ways to identify inaccurate data. For example, if we learn that customer support agents have been trained to use a value like “9999999999” when a client isn’t willing to provide their income, we can add this to a set of validation rules.
The example below shows what it might look like for a new client questionnaire. First, we define the rules.
CREATE OR REPLACE TABLE
data_accuracy_rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("age_range_valid","age BETWEEN 18 AND 120","validity"),
("income_range_valid","annual_income >= 0 AND annual_income < 10000000","validity"),
("email_format_valid","RLIKE(email, '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}$')","validity")
)
Now, we can apply those rules to our questionnaire responses and drop any records that violate them.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
df = spark.read.table("data_accuracy_rules").filter(col("tag") == tag).collect()
return {
row['name']: row['constraint']
for row in df
}
@Dlt.table
@Dlt.expect_all_or_drop(get_rules('validity'))
def raw_client_questionnaire():
return (
spark.read.format('csv').option("header", "true")
.load('/client_questionnaire_responses/')
)
Since inaccurate data can be so hard to identify through rules alone, there are cases where it may be more appropriate to quarantine records for a data steward to review instead of dropping them entirely. Sticking with our client onboarding example, let’s say we want to automatically quarantine records where a welcome email to the client can’t be delivered. To do this, we can create separate processing paths for valid and invalid records in downstream operations.
CREATE STREAMING VIEW raw_new_clients AS
SELECT * FROM STREAM(prod.clients.profile);
CREATE OR REFRESH STREAMING TABLE clients_profile_quarantine(
CONSTRAINT quarantined_row EXPECT (welcome_email_delivery != 'false')
)
PARTITIONED BY (is_quarantined)
AS
SELECT
*,
NOT (welcome_email_delivery != 'false') as is_quarantined
FROM STREAM(raw_new_clients);
CREATE OR REFRESH STREAMING TABLE valid_client_profiles AS
SELECT * FROM clients_profile_quarantine WHERE is_quarantined=FALSE;
CREATE OR REFRESH STREAMING TABLE invalid_client_profiles AS
SELECT * FROM clients_profile_quarantine WHERE is_quarantined=TRUE;
It’s impossible to solve all data quality issues in one shot, but realizing and maintaining high-quality data doesn’t need to be overwhelming.
By implementing a comprehensive data quality framework using expectations in Databricks, you can:
With trust in your data, you'll be able to confidently deliver on data and AI initiatives that drive real business value. By making data quality management a core discipline of your data engineering practice, you can ensure that the benefits are long-lasting.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.