Hi @Vivek_Patil1,
Great question -- this is a pattern we see frequently in enterprise data platforms, especially in healthcare and financial services where multi-source harmonization is critical. Here is a comprehensive architecture recommendation using native Databricks capabilities.
RECOMMENDED ARCHITECTURE: LAKEFLOW SPARK DECLARATIVE PIPELINES + CONFIG TABLES
I recommend Lakeflow Spark Declarative Pipelines (SDP, formerly Delta Live Tables) as the backbone of your framework. SDP gives you declarative data quality, automatic dependency resolution, built-in lineage via Unity Catalog, and -- critically -- the ability to dynamically generate tables from configuration using Python metaprogramming. This is the key to making it config-driven rather than hardcoded.
Here is how to structure it:
1. CONFIGURATION LAYER: DELTA TABLES AS YOUR FRAMEWORK METADATA
Store your harmonization rules in Delta tables in Unity Catalog. This makes your config queryable, versionable (via Delta time travel), and accessible from pipeline code at runtime.
harmonization_config.column_mappings -- maps source columns to harmonized target columns:
source_system | source_object | source_column | target_object | target_column | data_type | transformation_expr | priority
salesforce | account | acct_name | customer | customer_name | STRING | TRIM(UPPER({col})) | 1
sap | kna1 | name1 | customer | customer_name | STRING | TRIM(UPPER({col})) | 2
harmonization_config.quality_rules -- data quality expectations per target object:
target_object | rule_name | rule_expression | action
customer | valid_customer_name | customer_name IS NOT NULL | drop
customer | valid_email_format | email RLIKE '^[^@]+@[^@]+$' | warn
harmonization_config.survivorship_rules -- cross-source conflict resolution:
target_object | target_column | resolution_strategy | priority_order
customer | customer_name | SOURCE_PRIORITY | salesforce,sap,oracle
customer | phone | MOST_RECENT | NULL
customer | email | MOST_COMPLETE | NULL
2. DYNAMIC PIPELINE GENERATION WITH PYTHON METAPROGRAMMING
This is the most powerful pattern for config-driven pipelines. SDP supports creating tables dynamically in Python for loops. Combined with pipeline parameters accessible via spark.conf.get(), you can build a fully generic framework.
Here is the core pattern:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
# Read config at pipeline initialization
config_df = spark.table("harmonization_config.column_mappings")
target_objects = [row.target_object for row in config_df.select("target_object").distinct().collect()]
# Read quality rules into a dictionary for expect_all
quality_df = spark.table("harmonization_config.quality_rules")
for target_obj in target_objects:
obj_mappings = config_df.filter(F.col("target_object") == target_obj).collect()
# Build expectations dict for drop vs warn
drop_rules = {
row.rule_name: row.rule_expression
for row in quality_df.filter(
(F.col("target_object") == target_obj) & (F.col("action") == "drop")
).collect()
}
warn_rules = {
row.rule_name: row.rule_expression
for row in quality_df.filter(
(F.col("target_object") == target_obj) & (F.col("action") == "warn")
).collect()
}
# CRITICAL: Use default parameter binding to avoid late-binding closure issues
@DP.materialized_view(
name=f"harmonized_silver.{target_obj}",
comment=f"Harmonized view of {target_obj} from multiple sources"
)
@DP.expect_all(warn_rules)
@DP.expect_all_or_drop(drop_rules)
def create_harmonized_table(
_target=target_obj,
_mappings=obj_mappings
😞
source_dfs = []
for mapping_group in _group_by_source(_mappings):
source_sys = mapping_group[0].source_system
source_obj = mapping_group[0].source_object
source_df = spark.table(f"silver.{source_sys}_{source_obj}")
select_exprs = []
for m in mapping_group:
if m.transformation_expr:
expr = m.transformation_expr.replace("{col}", m.source_column)
select_exprs.append(F.expr(expr).cast(m.data_type).alias(m.target_column))
else:
select_exprs.append(F.col(m.source_column).cast(m.data_type).alias(m.target_column))
select_exprs.append(F.lit(source_sys).alias("_source_system"))
select_exprs.append(F.current_timestamp().alias("_harmonized_at"))
source_dfs.append(source_df.select(*select_exprs))
harmonized = source_dfs[0]
for df in source_dfs[1:]:
harmonized = harmonized.unionByName(df, allowMissingColumns=True)
return harmonized
IMPORTANT: Note the use of default parameter binding (_target=target_obj) in the function signature. This is required to avoid Python's late-binding closure issue where all loop iterations would reference the last value. This is explicitly documented in the Python development guide.
Docs: https://docs.databricks.com/en/delta-live-tables/python-dev.html
3. CROSS-SOURCE CONFLICT RESOLUTION
For survivorship logic (choosing which source's value "wins" when they conflict), use a dedicated resolution layer as a second materialized view:
@DP.materialized_view(name="harmonized_silver.customer_resolved")
def resolve_customer():
harmonized = spark.table("LIVE.harmonized_silver.customer")
survivorship = spark.table("harmonization_config.survivorship_rules")
from pyspark.sql.window import Window
# Rank records per entity key by source priority
w = Window.partitionBy("customer_id").orderBy("_source_priority")
return (
harmonized
.join(priority_df, on="source_system")
.withColumn("_rank", F.row_number().over(w))
.filter(F.col("_rank") == 1)
.drop("_rank", "_source_priority")
)
For more advanced survivorship (e.g., "take the most recent non-null value per column"), use COALESCE with window functions ordered by recency within each source.
4. DATA QUALITY WITH EXPECTATIONS
SDP's expectations framework is perfect for config-driven quality rules. The expect_all, expect_all_or_drop, and expect_all_or_fail decorators accept Python dictionaries, so you can load them directly from your config tables:
quality_rules = {
"valid_customer_name": "customer_name IS NOT NULL",
"valid_email": "email RLIKE '^[^@]+@[^@]+$'",
"valid_country_code": "country_code IN ('US','UK','DE','FR')"
}
@DP.materialized_view(name="harmonized_silver.customer")
@DP.expect_all_or_drop(quality_rules)
def customer_harmonized():
...
Quality metrics are automatically tracked in the pipeline UI and event log -- no custom audit tables needed for DQ monitoring.
Docs: https://docs.databricks.com/en/delta-live-tables/expectations.html
5. LINEAGE AND AUDITABILITY
Unity Catalog provides automatic table-level and column-level lineage tracking across your entire pipeline:
- Table-level lineage: Automatically captured for all SDP pipeline operations
- Column-level lineage: Available on Databricks Runtime 13.3 LTS+
- Lineage retention: 1 year of history, visible across workspaces sharing the same metastore
For additional audit tracking, add metadata columns to every harmonized table:
.withColumn("_source_system", F.lit(source_system))
.withColumn("_source_table", F.lit(source_table))
.withColumn("_harmonized_at", F.current_timestamp())
.withColumn("_pipeline_id", F.lit(spark.conf.get("pipelines.id", "unknown")))
Docs: https://docs.databricks.com/en/data-governance/unity-catalog/data-lineage.html
6. SCHEMA EVOLUTION
Delta Lake natively supports schema evolution:
- Use mergeSchema for additive changes (new columns from source systems)
- SDP materialized views automatically handle schema changes on refresh
- For column renaming/dropping, enable column mapping on your Delta tables
- Store your expected schema in config tables and validate against it as a quality check
Docs: https://docs.databricks.com/en/delta/update-schema.html
7. INCREMENTAL PROCESSING AT SCALE (100M+ RECORDS)
For large-scale incremental loads:
- Use streaming tables with spark.readStream for append-only ingestion from Silver into Harmonized_Silver
- Use AUTO CDC (formerly APPLY CHANGES INTO) for handling updates/deletes with SCD Type 1 or Type 2. This is especially powerful for handling out-of-order events from multiple sources.
- For pure batch with incremental refresh, materialized views automatically detect and process only changed data
Docs: https://docs.databricks.com/en/delta-live-tables/cdc.html
8. SDP VS. STRUCTURED NOTEBOOKS
To directly answer your architecture question:
SDP (Recommended):
- Data quality: Built-in expectations with metrics
- Lineage: Automatic via Unity Catalog
- Dependency management: Automatic DAG resolution
- Dynamic table generation: Python metaprogramming in loops
- Schema evolution: Automatic handling
- Monitoring: Built-in pipeline UI
- Incremental processing: Native streaming + AUTO CDC
Structured Notebooks:
- Data quality: Must build custom
- Lineage: Manual tracking
- Dependency management: Manual orchestration
- Dynamic table generation: Full flexibility
- Schema evolution: Manual
- Monitoring: Custom dashboards
- Incremental processing: Manual checkpointing
SDP is the better choice for this use case because the declarative approach with automatic dependency resolution, built-in data quality expectations, and native lineage integration directly address your requirements.
9. ANTI-PATTERNS TO AVOID
1. Hardcoding transformations per source -- Use config tables and dynamic generation instead
2. Storing config in notebooks or JSON files -- Use Delta tables for versioning, querying, and sharing
3. Late-binding closures in Python loops -- Always use default parameter binding when creating tables in loops
4. Processing full datasets every run -- Use streaming tables or AUTO CDC for incremental processing
5. Ignoring data quality until Gold layer -- Apply expectations at Harmonized_Silver to catch issues early
6. Single monolithic pipeline -- Split by domain (Customer, Product, Order) for independent scaling and failure isolation
DOCUMENTATION REFERENCES
- Medallion Architecture: https://docs.databricks.com/en/lakehouse/medallion.html
- Lakeflow Spark Declarative Pipelines: https://docs.databricks.com/en/delta-live-tables/index.html
- Develop pipeline code with Python: https://docs.databricks.com/en/delta-live-tables/python-dev.html
- Use parameters with pipelines: https://docs.databricks.com/en/delta-live-tables/parameters.html
- Manage data quality with expectations: https://docs.databricks.com/en/delta-live-tables/expectations.html
- Change data capture with AUTO CDC: https://docs.databricks.com/en/delta-live-tables/cdc.html
- Update Delta Lake table schema: https://docs.databricks.com/en/delta/update-schema.html
- Unity Catalog data lineage: https://docs.databricks.com/en/data-governance/unity-catalog/data-lineage.html
- Configure a pipeline: https://docs.databricks.com/en/delta-live-tables/configure-pipeline.html
- Databricks Asset Bundles: https://docs.databricks.com/en/dev-tools/bundles/index.html
Hope this helps -- happy to dive deeper into any specific aspect of this architecture!
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.