Here's a paradox every data team faces: data quality is everyone's problem, but implementing it requires specialized skills that few possess.
Data engineers understand the importance of null checks and referential integrity. Data analysts know which business rules matter. Data stewards define governance policies. But translating all of this into executable data quality checks? That requires:
The result? Data quality becomes a bottleneck. Rules are written reactively (after problems occur), coverage is inconsistent, and there's no single source of truth for what checks exist or why they were created.
Every organization has analysts and business users who understand their data better than anyone. They know which values should never be negative, which fields must always be filled, and which relationships must hold true. But turning that knowledge into automated data quality checks? That's where they hit a wall.
The reality today:
This creates dependency on both sides. Analysts feel powerless. Engineers feel overwhelmed. And data quality suffers.
DQX Data Quality Manager changes this dynamic.
We built an intuitive web interface that empowers analysts, data stewards, and business users to create production-ready data quality rules - without writing a single line of code. Simply describe what "good data" looks like in plain English, and AI translates that into executable checks that democratizes data quality for the entire organization.
| Component | Technology | Purpose |
| Frontend | HTML/CSS/JavaScript | Responsive web interface |
| Backend | Flask 3.0 + Gunicorn | REST API and page rendering |
| Data Quality | Databricks Labs DQX | Rule execution engine |
| AI | Model Serving Endpoint | Natural language understanding |
| Compute | Serverless Jobs | Rule generation and validation |
| Data Catalog | Unity Catalog | Data discovery and governance |
| Storage | Lakebase Autoscaling | Rule versioning and history |
| Deployment | Databricks Asset Bundles | Infrastructure as code |
| CI/CD | GitHub Actions + OIDC | Automated deployments |
Before diving into features, let's understand how the pieces fit together.
Data engineers, analysts, and stewards access the application through their browser. Each user authenticates via Databricks OAuth, and their identity flows through every operation.
The Flask application runs on Databricks Apps, which handles:
Two types of compute handle different workloads:
Three data stores serve different purposes:
Let's follow a data engineer named Josh as she creates quality rules for a new table.
Josh opens the DQX Data Quality Manager and is greeted by the Generator page. The first step is selecting her target table.
The catalog browser shows only what Josh has permission to access. Behind the scenes, the app executes SQL using her forwarded OAuth token:
def get_catalogs(self) -> List[str]:
"""Get list of available catalogs using user's permissions."""
try:
# This SQL runs with the user's token (OBO)
catalogs = self.execute_sql("SHOW CATALOGS")
return catalogs if catalogs else ["main"]
except Exception as e:
print(f"Error listing catalogs: {e}")
return ["main"]
def get_schemas(self, catalog: str) -> List[str]:
"""Get schemas in a catalog using user's permissions."""
return self.execute_sql(f"SHOW SCHEMAS IN `{catalog}`")
def get_tables(self, catalog: str, schema: str) -> List[str]:
"""Get tables in a schema using user's permissions."""
result = self.execute_sql_with_schema(
f"SHOW TABLES IN `{catalog}`.`{schema}`"
)
return [row.get("tableName") for row in result["rows"]]
Josh selects her table as shown in previous image. Immediately, the app displays a sample of the data:
This preview helps Josh understand the data structure before writing rules. The sample query also uses her permissions—if she can't SELECT from this table, she can't see the preview.
Now comes the magic. Instead of writing JSON or YAML or learning DQX syntax, Josh types:
"Generate data quality rules for completeness, validity, and consistency checks."
Josh clicks "Generate Rules" and watches the progress indicator. Behind the scenes, a sophisticated generator springs into action.
The generation process involves multiple steps:
The app triggers a serverless job with Josh's requirements:
def trigger_dq_job(self, table_name: str, user_prompt: str,
sample_limit: Optional[int] = None) -> Dict[str, Any]:
"""Trigger the DQ rule generation job."""
# Jobs use service principal (no user token scope for jobs API)
client = self._get_client(use_user_token=False)
job_parameters = {
"table_name": table_name,
"user_prompt": user_prompt
}
if sample_limit:
job_parameters["sample_limit"] = str(sample_limit)
response = client.jobs.run_now(
job_id=int(Config.DQ_GENERATION_JOB_ID),
job_parameters=job_parameters
)
return {"run_id": response.run_id}
The serverless notebook profiles the table to understand its structure:
from databricks.labs.dqx.profiler import DQProfiler
# Profile the data
profiler = DQProfiler(WorkspaceClient())
profile_result = profiler.profile(
df=spark.table(table_name).limit(sample_limit),
output_type="dict"
)
# Extract key statistics
column_stats = []
for col in profile_result["columns"]:
column_stats.append({
"name": col["name"],
"type": col["type"],
"null_count": col["null_count"],
"distinct_count": col["distinct_count"],
"sample_values": col.get("sample_values", [])[:5]
})
The profile and user prompt are sent to Claude:
from databricks.labs.dqx.generator import DQGenerator
# Build context for AI
context = f"""
Table: {table_name}
Column Statistics: {json.dumps(column_stats, indent=2)}
User Requirements:
{user_prompt}
Generate DQX-compatible data quality rules that address these requirements.
Consider the actual data statistics when choosing appropriate checks.
"""
# Generate rules using DQX Generator (which calls Model Serving)
generator = DQGenerator(WorkspaceClient())
rules = generator.generate_dq_rules_ai_assisted(
user_input=context,
input_config=table_name,
summary_stats=summary_stats_from_profiler
)
After 30-60 seconds, Josh sees her generated rules:
The AI understood Josh's requirements and translated them into proper DQX syntax, choosing appropriate check functions and criticality levels.
Before saving, Josh wants to know: Do these rules actually work with my data?
She clicks "Validate Rules" to test against the actual table:
The validation job uses the DQX Engine to apply checks:
from databricks.labs.dqx.engine import DQEngine
# Initialize engine
dq_engine = DQEngine(WorkspaceClient())
# Load data
df = spark.table(table_name)
# Apply checks and split results
valid_df, invalid_df = dq_engine.apply_checks_and_split(df, checks)
# Calculate statistics
total_rows = df.count()
valid_rows = valid_df.count()
invalid_rows = invalid_df.count()
# Get per-rule breakdown
rule_results = []
for check in checks:
rule_name = check.get("name", "unnamed")
violations = invalid_df.filter(
col("_dq_issues").contains(rule_name)
).count()
rule_results.append({
"rule": rule_name,
"violations": violations,
"pass_rate": ((total_rows - violations) / total_rows) * 100
})
Josh sees how the DQ rules was applied on the selected table:
Before saving, Josh clicks "Analyze with AI" to get expert insights:
def analyze_rules(rules: List[Dict], table_name: str,
user_prompt: str) -> Dict[str, Any]:
"""Analyze DQ rules using AI via SQL Statement Execution."""
analysis_prompt = f"""You are a Data Quality expert.
Analyze the following DQ rules for table '{table_name}'.
User's original requirement: {user_prompt}
Generated Rules: {json.dumps(rules, indent=2)}
Provide a JSON response with:
- summary: 2-3 sentence overview
- rule_analysis: explanation of each rule
- coverage_assessment: how well rules cover requirements
- recommendations: suggestions for additional rules
- overall_quality_score: 1-10 rating
"""
# Execute via ai_query() SQL function (uses user's token)
sql = f"""
SELECT ai_query(
'{Config.MODEL_SERVING_ENDPOINT}',
'{analysis_prompt}'
) as analysis
"""
response = ws.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=sql,
wait_timeout="0s" # Async execution
)
# Poll for results...
The AI analysis tells Josh on below metrics:
Satisfied with the rules, Josh clicks "Confirm & Save":
def save_rules(table_name: str, rules: List[Dict],
user_prompt: str, ai_summary: Optional[Dict] = None) -> Dict:
"""Save DQ rules to Lakebase with versioning."""
# Get next version number
version = LakebaseService.get_next_version(table_name)
# Deactivate previous versions
cursor.execute("""
UPDATE dq_rules_events
SET is_active = FALSE
WHERE table_name = %s AND is_active = TRUE
""", (table_name,))
# Insert new version
cursor.execute("""
INSERT INTO dq_rules_events
(id, table_name, version, rules, user_prompt,
ai_summary, created_by, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING id, version, created_at
""", (
str(uuid.uuid4()),
table_name,
version,
json.dumps(rules),
user_prompt,
json.dumps(ai_summary) if ai_summary else None,
"dq-rule-generator-app"
))
return {
"success": True,
"version": version,
"message": f"Rules saved as version {version}"
}
Josh's rules are now:
Security was a primary design concern. Different operations require different authentication strategies:
When Josh browses catalogs or queries data, her OAuth token is forwarded:
def _get_sql_connection(self):
"""SQL connection using user's token (OBO)."""
user_token = request.headers.get('x-forwarded-access-token')
if user_token:
# OBO: Use Josh's token
return sql.connect(
server_hostname=host,
http_path=http_path,
access_token=user_token
)
Why OBO? Josh should only see tables she has SELECT permission on. If she tries to access the tables where she don't have SELECT access, she'll get a permission denied error - exactly as expected.
Jobs API doesn't support user tokens, so we use the app's service principal:
def trigger_dq_job(self, table_name: str, user_prompt: str):
"""Trigger job with app service principal."""
# use_user_token=False → Use SP credentials
client = self._get_client(use_user_token=False)
response = client.jobs.run_now(
job_id=int(Config.DQ_GENERATION_JOB_ID),
job_parameters={...}
)
Security Note: The service principal has CAN_MANAGE_RUN permission on specific jobs only - it can't create new jobs or access other resources.
Lakebase uses Josh's OAuth token as her PostgreSQL password:
def get_connection():
"""PostgreSQL connection with OAuth."""
user_email = request.headers.get('x-forwarded-email')
user_token = request.headers.get('x-forwarded-access-token')
return psycopg2.connect(
host=Config.LAKEBASE_HOST,
database=Config.LAKEBASE_DATABASE,
user=user_email, # Josh's email
password=user_token, # Her OAuth token
sslmode='require'
)
Why? Rule ownership is tracked. When Josh saves rules, Lakebase knows it was her - enabling audit trails and access control.
The entire application deploys using Databricks Asset Bundles (DAB):
# One command to deploy everything
databricks bundle deploy -t {env}
This deploys the Flask app, serverless jobs, notebooks, and configures all permissions automatically.
# resources/apps.yml - Key configuration
resources:
apps:
dqx_app:
name: "dqx-rule-generator-${bundle.target}"
source_code_path: ../src
user_api_scopes:
- sql # Enable OBO authentication
resources:
- name: "generation-job"
job:
id: ${resources.jobs.dq_rule_generation.id}
permission: "CAN_MANAGE_RUN"
Three environments (dev, stage, prod) with isolated configurations ensure safe promotion through the pipeline.
Standard pytest suite with mocked Databricks services:
def test_get_catalogs_with_mock(self, app):
"""Test get_catalogs with mocked SQL connection."""
with patch.object(service, 'execute_sql') as mock_sql:
mock_sql.return_value = ["main", "samples"]
catalogs = service.get_catalogs()
assert catalogs == ["main", "samples"]
CI runs unit tests on every push; integration tests require a Databricks connection and run separately.
Retrofitting OBO authentication is painful. Design your data access layer to accept tokens from the start:
# Good: Token passed explicitly
def get_catalogs(self, token: str) -> List[str]:
...
# Better: Token extracted from request context
def get_catalogs(self) -> List[str]:
token = self._get_user_token() # From x-forwarded-access-token
...
Rule generation time varies wildly based on:
Serverless jobs handle this perfectly—no cluster management, no idle costs.
Rules change. Requirements evolve. Always keep history:
# Query rule history for any table
SELECT version, created_at, user_prompt, rules
FROM dq_rules_events
WHERE table_name = 'catalog.schema.table'
ORDER BY version DESC;
External services fail. Handle it gracefully:
def get_catalogs(self) -> List[str]:
try:
return self.execute_sql("SHOW CATALOGS")
except Exception as e:
logger.error(f"Catalog query failed: {e}")
return ["main"] # Sensible default
AI responses can be unpredictable. Log everything:
def analyze_rules(rules, table_name, prompt):
logger.info(f"AI Analysis Request: table={table_name}, rules={len(rules)}")
result = call_ai(...)
logger.info(f"AI Analysis Response: score={result.get('quality_score')}")
return result
DQX Data Quality Manager is an AI-powered Databricks App that lets you generate, validate, and version data quality rules using natural language.
Data quality has long been the domain of specialists. With DQX Data Quality Manager, we've shown that:
The future of data quality is AI-native, user-friendly, and democratized. We've built a foundation - now it's your turn to extend it.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.