- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
- Introduction: The Data Quality Paradox
- What We Built
- The Technology Stack
- Architecture Deep Dive
- The Four-Layer Architecture
- Layer 1: Users
- Layer 2: Databricks Apps Platform
- Layer 3: Compute Layer
- Layer 4: Data Layer
- Why This Design?
- The User Experience: A Complete Walkthrough
- Step 1: Discovering Data
- Step 2: Describing Quality Requirements
- Step 3: AI-Powered Rule Generation
- Step 3a: Job Triggering
- Step 3b: Data Profiling
- Step 3c: AI Generation
- Step 3d: Results Returned
- Step 4: Validating Against Real Data
- Step 5: AI Analysis and Recommendations
- Step 6: Saving with Version Control
- Security Architecture: The Three Authentication Paths
- Path 1: On-Behalf-Of (OBO) for Data Access
- Path 2: Service Principal for Jobs
- Path 3: OAuth for Lakebase
- Deployment: Infrastructure as Code
- Testing
- Lessons Learned and Best Practices
- 1. Design for OBO from Day One
- 2. Use Serverless for Unpredictable Workloads
- 3. Version Everything
- 4. Fail Gracefully
- 5. Make AI Interactions Observable
- TL;DR
- Prerequisites:
- Get Started:
- Resources:
- Conclusion
Introduction: The Data Quality Paradox
Here's a paradox every data team faces: data quality is everyone's problem, but implementing it requires specialized skills that few possess.
Data engineers understand the importance of null checks and referential integrity. Data analysts know which business rules matter. Data stewards define governance policies. But translating all of this into executable data quality checks? That requires:
- Deep knowledge of data quality frameworks
- Understanding of SQL and Python
- Familiarity with the specific DQ library's syntax
- Time to write, test, and maintain hundreds of rules
The result? Data quality becomes a bottleneck. Rules are written reactively (after problems occur), coverage is inconsistent, and there's no single source of truth for what checks exist or why they were created.
What We Built
Every organization has analysts and business users who understand their data better than anyone. They know which values should never be negative, which fields must always be filled, and which relationships must hold true. But turning that knowledge into automated data quality checks? That's where they hit a wall.
The reality today:
- Business analysts identify a data quality issue in their dashboard
- They submit a ticket to the data engineering team
- When implemented, the rule might not match what was originally needed
- Repeat for every new requirement
This creates dependency on both sides. Analysts feel powerless. Engineers feel overwhelmed. And data quality suffers.
DQX Data Quality Manager changes this dynamic.
We built an intuitive web interface that empowers analysts, data stewards, and business users to create production-ready data quality rules - without writing a single line of code. Simply describe what "good data" looks like in plain English, and AI translates that into executable checks that democratizes data quality for the entire organization.
The Technology Stack
| Component | Technology | Purpose |
| Frontend | HTML/CSS/JavaScript | Responsive web interface |
| Backend | Flask 3.0 + Gunicorn | REST API and page rendering |
| Data Quality | Databricks Labs DQX | Rule execution engine |
| AI | Model Serving Endpoint | Natural language understanding |
| Compute | Serverless Jobs | Rule generation and validation |
| Data Catalog | Unity Catalog | Data discovery and governance |
| Storage | Lakebase Autoscaling | Rule versioning and history |
| Deployment | Databricks Asset Bundles | Infrastructure as code |
| CI/CD | GitHub Actions + OIDC | Automated deployments |
Architecture Deep Dive
Before diving into features, let's understand how the pieces fit together.
The Four-Layer Architecture
Layer 1: Users
Data engineers, analysts, and stewards access the application through their browser. Each user authenticates via Databricks OAuth, and their identity flows through every operation.
Layer 2: Databricks Apps Platform
The Flask application runs on Databricks Apps, which handles:
- OAuth authentication and token management
- HTTPS termination and certificate management
- Load balancing and auto-scaling
- Request routing and header forwarding
Layer 3: Compute Layer
Two types of compute handle different workloads:
- SQL Warehouse: Executes queries for catalog browsing, data sampling, and AI analysis
- Serverless Jobs: Runs heavy compute for rule generation and validation
Layer 4: Data Layer
Three data stores serve different purposes:
- Unity Catalog: Source of truth for table metadata and data
- Model Serving: Hosts Claude AI for natural language processing
- Lakebase Autoscaling: Stores rule definitions with version history
Why This Design?
- Separation of Concerns: The Flask app handles UI and API routing, but delegates heavy computation to serverless jobs. This keeps the app responsive even during long-running operations.
- Security by Design: User tokens flow through for data access (OBO), but jobs run with controlled service principal permissions. This prevents privilege escalation while enabling necessary operations.
- Scalability: Serverless compute scales automatically. Whether one user or one hundred are generating rules simultaneously, the system handles it without pre-provisioning.
The User Experience: A Complete Walkthrough
Let's follow a data engineer named Josh as she creates quality rules for a new table.
Step 1: Discovering Data
Josh opens the DQX Data Quality Manager and is greeted by the Generator page. The first step is selecting her target table.
The catalog browser shows only what Josh has permission to access. Behind the scenes, the app executes SQL using her forwarded OAuth token:
def get_catalogs(self) -> List[str]:
"""Get list of available catalogs using user's permissions."""
try:
# This SQL runs with the user's token (OBO)
catalogs = self.execute_sql("SHOW CATALOGS")
return catalogs if catalogs else ["main"]
except Exception as e:
print(f"Error listing catalogs: {e}")
return ["main"]
def get_schemas(self, catalog: str) -> List[str]:
"""Get schemas in a catalog using user's permissions."""
return self.execute_sql(f"SHOW SCHEMAS IN `{catalog}`")
def get_tables(self, catalog: str, schema: str) -> List[str]:
"""Get tables in a schema using user's permissions."""
result = self.execute_sql_with_schema(
f"SHOW TABLES IN `{catalog}`.`{schema}`"
)
return [row.get("tableName") for row in result["rows"]]
Josh selects her table as shown in previous image. Immediately, the app displays a sample of the data:
This preview helps Josh understand the data structure before writing rules. The sample query also uses her permissions—if she can't SELECT from this table, she can't see the preview.
Step 2: Describing Quality Requirements
Now comes the magic. Instead of writing JSON or YAML or learning DQX syntax, Josh types:
"Generate data quality rules for completeness, validity, and consistency checks."
Josh clicks "Generate Rules" and watches the progress indicator. Behind the scenes, a sophisticated generator springs into action.
Step 3: AI-Powered Rule Generation
The generation process involves multiple steps:
Step 3a: Job Triggering
The app triggers a serverless job with Josh's requirements:
def trigger_dq_job(self, table_name: str, user_prompt: str,
sample_limit: Optional[int] = None) -> Dict[str, Any]:
"""Trigger the DQ rule generation job."""
# Jobs use service principal (no user token scope for jobs API)
client = self._get_client(use_user_token=False)
job_parameters = {
"table_name": table_name,
"user_prompt": user_prompt
}
if sample_limit:
job_parameters["sample_limit"] = str(sample_limit)
response = client.jobs.run_now(
job_id=int(Config.DQ_GENERATION_JOB_ID),
job_parameters=job_parameters
)
return {"run_id": response.run_id}
Step 3b: Data Profiling
The serverless notebook profiles the table to understand its structure:
from databricks.labs.dqx.profiler import DQProfiler
# Profile the data
profiler = DQProfiler(WorkspaceClient())
profile_result = profiler.profile(
df=spark.table(table_name).limit(sample_limit),
output_type="dict"
)
# Extract key statistics
column_stats = []
for col in profile_result["columns"]:
column_stats.append({
"name": col["name"],
"type": col["type"],
"null_count": col["null_count"],
"distinct_count": col["distinct_count"],
"sample_values": col.get("sample_values", [])[:5]
})
Step 3c: AI Generation
The profile and user prompt are sent to Claude:
from databricks.labs.dqx.generator import DQGenerator
# Build context for AI
context = f"""
Table: {table_name}
Column Statistics: {json.dumps(column_stats, indent=2)}
User Requirements:
{user_prompt}
Generate DQX-compatible data quality rules that address these requirements.
Consider the actual data statistics when choosing appropriate checks.
"""
# Generate rules using DQX Generator (which calls Model Serving)
generator = DQGenerator(WorkspaceClient())
rules = generator.generate_dq_rules_ai_assisted(
user_input=context,
input_config=table_name,
summary_stats=summary_stats_from_profiler
)
Step 3d: Results Returned
After 30-60 seconds, Josh sees her generated rules:
The AI understood Josh's requirements and translated them into proper DQX syntax, choosing appropriate check functions and criticality levels.
Step 4: Validating Against Real Data
Before saving, Josh wants to know: Do these rules actually work with my data?
She clicks "Validate Rules" to test against the actual table:
The validation job uses the DQX Engine to apply checks:
from databricks.labs.dqx.engine import DQEngine
# Initialize engine
dq_engine = DQEngine(WorkspaceClient())
# Load data
df = spark.table(table_name)
# Apply checks and split results
valid_df, invalid_df = dq_engine.apply_checks_and_split(df, checks)
# Calculate statistics
total_rows = df.count()
valid_rows = valid_df.count()
invalid_rows = invalid_df.count()
# Get per-rule breakdown
rule_results = []
for check in checks:
rule_name = check.get("name", "unnamed")
violations = invalid_df.filter(
col("_dq_issues").contains(rule_name)
).count()
rule_results.append({
"rule": rule_name,
"violations": violations,
"pass_rate": ((total_rows - violations) / total_rows) * 100
})
Josh sees how the DQ rules was applied on the selected table:
Step 5: AI Analysis and Recommendations
Before saving, Josh clicks "Analyze with AI" to get expert insights:
def analyze_rules(rules: List[Dict], table_name: str,
user_prompt: str) -> Dict[str, Any]:
"""Analyze DQ rules using AI via SQL Statement Execution."""
analysis_prompt = f"""You are a Data Quality expert.
Analyze the following DQ rules for table '{table_name}'.
User's original requirement: {user_prompt}
Generated Rules: {json.dumps(rules, indent=2)}
Provide a JSON response with:
- summary: 2-3 sentence overview
- rule_analysis: explanation of each rule
- coverage_assessment: how well rules cover requirements
- recommendations: suggestions for additional rules
- overall_quality_score: 1-10 rating
"""
# Execute via ai_query() SQL function (uses user's token)
sql = f"""
SELECT ai_query(
'{Config.MODEL_SERVING_ENDPOINT}',
'{analysis_prompt}'
) as analysis
"""
response = ws.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=sql,
wait_timeout="0s" # Async execution
)
# Poll for results...
The AI analysis tells Josh on below metrics:
- Quality Score
- Coverage: Good coverage of explicit requirements
- Recommendations
Step 6: Saving with Version Control
Satisfied with the rules, Josh clicks "Confirm & Save":
def save_rules(table_name: str, rules: List[Dict],
user_prompt: str, ai_summary: Optional[Dict] = None) -> Dict:
"""Save DQ rules to Lakebase with versioning."""
# Get next version number
version = LakebaseService.get_next_version(table_name)
# Deactivate previous versions
cursor.execute("""
UPDATE dq_rules_events
SET is_active = FALSE
WHERE table_name = %s AND is_active = TRUE
""", (table_name,))
# Insert new version
cursor.execute("""
INSERT INTO dq_rules_events
(id, table_name, version, rules, user_prompt,
ai_summary, created_by, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
RETURNING id, version, created_at
""", (
str(uuid.uuid4()),
table_name,
version,
json.dumps(rules),
user_prompt,
json.dumps(ai_summary) if ai_summary else None,
"dq-rule-generator-app"
))
return {
"success": True,
"version": version,
"message": f"Rules saved as version {version}"
}
Josh's rules are now:
- Stored in Lakebase with respective version
- Linked to her original prompt (for context)
- Includes AI analysis summary
- Ready for use in production pipelines
Security Architecture: The Three Authentication Paths
Security was a primary design concern. Different operations require different authentication strategies:
Path 1: On-Behalf-Of (OBO) for Data Access
When Josh browses catalogs or queries data, her OAuth token is forwarded:
def _get_sql_connection(self):
"""SQL connection using user's token (OBO)."""
user_token = request.headers.get('x-forwarded-access-token')
if user_token:
# OBO: Use Josh's token
return sql.connect(
server_hostname=host,
http_path=http_path,
access_token=user_token
)
Why OBO? Josh should only see tables she has SELECT permission on. If she tries to access the tables where she don't have SELECT access, she'll get a permission denied error - exactly as expected.
Path 2: Service Principal for Jobs
Jobs API doesn't support user tokens, so we use the app's service principal:
def trigger_dq_job(self, table_name: str, user_prompt: str):
"""Trigger job with app service principal."""
# use_user_token=False → Use SP credentials
client = self._get_client(use_user_token=False)
response = client.jobs.run_now(
job_id=int(Config.DQ_GENERATION_JOB_ID),
job_parameters={...}
)
Security Note: The service principal has CAN_MANAGE_RUN permission on specific jobs only - it can't create new jobs or access other resources.
Path 3: OAuth for Lakebase
Lakebase uses Josh's OAuth token as her PostgreSQL password:
def get_connection():
"""PostgreSQL connection with OAuth."""
user_email = request.headers.get('x-forwarded-email')
user_token = request.headers.get('x-forwarded-access-token')
return psycopg2.connect(
host=Config.LAKEBASE_HOST,
database=Config.LAKEBASE_DATABASE,
user=user_email, # Josh's email
password=user_token, # Her OAuth token
sslmode='require'
)
Why? Rule ownership is tracked. When Josh saves rules, Lakebase knows it was her - enabling audit trails and access control.
Deployment: Infrastructure as Code
The entire application deploys using Databricks Asset Bundles (DAB):
# One command to deploy everything
databricks bundle deploy -t {env}
This deploys the Flask app, serverless jobs, notebooks, and configures all permissions automatically.
# resources/apps.yml - Key configuration
resources:
apps:
dqx_app:
name: "dqx-rule-generator-${bundle.target}"
source_code_path: ../src
user_api_scopes:
- sql # Enable OBO authentication
resources:
- name: "generation-job"
job:
id: ${resources.jobs.dq_rule_generation.id}
permission: "CAN_MANAGE_RUN"
Three environments (dev, stage, prod) with isolated configurations ensure safe promotion through the pipeline.
Testing
Standard pytest suite with mocked Databricks services:
def test_get_catalogs_with_mock(self, app):
"""Test get_catalogs with mocked SQL connection."""
with patch.object(service, 'execute_sql') as mock_sql:
mock_sql.return_value = ["main", "samples"]
catalogs = service.get_catalogs()
assert catalogs == ["main", "samples"]
CI runs unit tests on every push; integration tests require a Databricks connection and run separately.
Lessons Learned and Best Practices
1. Design for OBO from Day One
Retrofitting OBO authentication is painful. Design your data access layer to accept tokens from the start:
# Good: Token passed explicitly
def get_catalogs(self, token: str) -> List[str]:
...
# Better: Token extracted from request context
def get_catalogs(self) -> List[str]:
token = self._get_user_token() # From x-forwarded-access-token
...
2. Use Serverless for Unpredictable Workloads
Rule generation time varies wildly based on:
- Table size
- Number of columns
- Complexity of requirements
- AI model response time
Serverless jobs handle this perfectly—no cluster management, no idle costs.
3. Version Everything
Rules change. Requirements evolve. Always keep history:
# Query rule history for any table
SELECT version, created_at, user_prompt, rules
FROM dq_rules_events
WHERE table_name = 'catalog.schema.table'
ORDER BY version DESC;
4. Fail Gracefully
External services fail. Handle it gracefully:
def get_catalogs(self) -> List[str]:
try:
return self.execute_sql("SHOW CATALOGS")
except Exception as e:
logger.error(f"Catalog query failed: {e}")
return ["main"] # Sensible default
5. Make AI Interactions Observable
AI responses can be unpredictable. Log everything:
def analyze_rules(rules, table_name, prompt):
logger.info(f"AI Analysis Request: table={table_name}, rules={len(rules)}")
result = call_ai(...)
logger.info(f"AI Analysis Response: score={result.get('quality_score')}")
return result
TL;DR
DQX Data Quality Manager is an AI-powered Databricks App that lets you generate, validate, and version data quality rules using natural language.
Prerequisites:
- Databricks workspace (With Lakebase Autoscaling enabled)
- Unity Catalog enabled
- SQL Warehouse (Serverless recommended)
- Model Serving endpoint access
Get Started:
- git clone https://github.com/dediggibyte/databricks_dqx_agent.git
- cd databricks_dqx_agent
- create your .env file with the required variables.
- databricks bundle deploy -t dev
Resources:
- https://github.com/dediggibyte/databricks_dqx_agent
- https://dediggibyte.github.io/databricks_dqx_agent/
- https://databrickslabs.github.io/dqx/
Conclusion
Data quality has long been the domain of specialists. With DQX Data Quality Manager, we've shown that:
- Natural language can replace complex syntax - Anyone can describe quality requirements
- AI understands data context - Rules are tailored to actual data characteristics
- Enterprise security is non-negotiable - OBO ensures proper access control
- Modern platforms accelerate development - Databricks Apps + DAB = rapid deployment
- Open source enables collaboration - Build on our work, contribute improvements
The future of data quality is AI-native, user-friendly, and democratized. We've built a foundation - now it's your turn to extend it.