A comprehensive implementation of a scalable, extensible data ingestion and processing framework using DLT and Spark Streaming, driven entirely by configuration tables
Introduction
In modern data engineering, the ability to incrementally handle semi-structured data with evolving schemas while maintaining performance and reliability has become critical. This blog explains the implementation of a robust configuration-driven framework (CF) using DLT and structured streaming, addressing complex data ingestion and processing requirements. By leveraging a configuration-driven approach, we achieve the following:
Reduced Development Time: New data sources can be onboarded by simply updating configuration tables rather than writing new code, making it easy to migrate legacy systems with 1000s of pipelines. Importantly, the new pipelines use best practices incorporated into the framework
Improved Maintainability: The clear separation between configuration and processing logic makes the system easier to understand and maintain.
Enhanced Scalability: The framework utilizes change data feed and can handle growing data volumes and complexity by leveraging DLT's native performance capabilities.
Adaptability to Change: Schema evolution and new requirements are addressed through configuration updates rather than code changes.
Consistent Processing: A standardized approach ensures all data follows the same proven processing steps, reducing errors and inconsistencies
Beyond its impressive capabilities, our CF tackles the demanding challenges of everyday data ingestion with remarkable efficiency! Think of it as a Swiss Army knife for data processing – effortlessly handling schema evolution, capturing data changes in real-time, and untangling even the most complex nested JSON structures. With our step-by-step guidance and the complete code available on GitHub repo, you'll build a configuration-driven framework that serves as the perfect foundation for all future projects. The code follows Databricks' best practices, builds in scalability, and results in faster deployment times that will transform how your team handles data challenges!
The CF addresses the following technical requirements:
For more details on the example, please check out the README.md and code on the github repo. The remaining sections of this post provide details on the CF and the reasons for our design choices.
Solution Architecture Overview
The solution combines the powerful capabilities of Databricks DLT with Spark Streaming to create a metadata-driven pipeline framework. The resulting pipeline is created to follow the standard Medallion architecture methodology. Therefore, the code ingests the data from the bronze to the silver layer based on the configuration requirements. The gold layer is not part of this framework.
The pipeline architecture consists of the following key components:
This architecture provides a clean separation between configuration and processing logic, enabling flexible adaptation to changing requirements without modifying core code. Let's double-click on the configuration tables.
The Configuration Tables
The entire pipeline is driven by four meticulously designed configuration tables that control different aspects of the data flow. The config table rows correspond to the tables that will be created in the corresponding medallion layer. Let's examine each table and its purpose:
Configuration Table |
Purpose |
Details |
config_bronze_raw |
Controls initial bronze layer ingestion |
Source paths, binary data handling, custom schema evolution rules |
config_bronze_childnodes_raw |
Specifies the child objects in nested JSON structures that need to be processed |
Sequential select expressions, field mappings, execution order |
config_bronze |
Implements change data capture for bronze tables |
CDF parameters, primary key definitions, SCD Type 1 merge settings |
config_silver |
Defines business logic and final output generation in the silver layer |
Join logic between bronze tables, view definitions, CDC generation for silver tables, SCD Type 2 merge |
This configuration enables the system to maintain consistent change tracking even for complex nested structures, ensuring accurate incremental processing throughout the pipeline.
The following image is the workflow generated/deployed using CF. Each box of the workflow, starting with bronze_raw1, is a notebook; some are spark streaming, and some are DLT. Spark streaming is used to make data streaming ready and consumable by DLT Or when the schema evolution features are in private preview. For further details on each of these tasks, please refer to readme.md in the github repo
The code generation notebook
A code generation notebook is implemented that reads the configuration tables and creates DLT notebooks. Key benefits of the code generation notebook are the ability to easily debug and validate. Rather than having DLT pipelines read directly from configuration tables, the notebook generates the underlying code for the DLT notebooks. This approach combined the flexibility of configuration-driven development with the reliability of generated, inspectable code. If the configuration changes, the notebooks have to be regenerated. Here we are generating the notebooks, but we have seen customers directly use the code generation notebook by parameterizing it.
Implementation details
Ideally, we would implement all the functionality in DLT notebooks. However, while DLT provided many powerful capabilities, DLT could not natively infer schemas on binary columns, which contained most of our actual data. Binary columns, in turn, contain JSON structs. This is a typical data pattern when data is ingested from Kafka streams or similar streaming sources. Therefore, we built a custom Python function to handle incremental schema evolution. We implemented the custom approach using foreachBatch in Spark Streaming to infer and evolve the schema from these JSON strings incrementally. Provided the code snippet below for reference. We store them in a delta table, namely the schema_registry delta table. Schema evolution in from_json is released to public preview now. You can explore more here.
def merge_schemas(schema1, schema2):
def merge_fields(fields1, fields2):
merged_fields = list(fields1)
field_names = set(field.name for field in fields1)
for field in fields2:
if field.name not in field_names:
merged_fields.append(field)
else:
existing_field = next(f for f in merged_fields if f.name == field.name)
if isinstance(field.dataType, StructType) and isinstance(existing_field.dataType, StructType):
merged_datatype = merge_schemas(existing_field.dataType, field.dataType)
merged_fields[merged_fields.index(existing_field)] = StructField(field.name, merged_datatype, field.nullable)
elif isinstance(field.dataType, ArrayType) and isinstance(existing_field.dataType, ArrayType):
if isinstance(field.dataType.elementType, StructType) and isinstance(existing_field.dataType.elementType, StructType):
merged_element_type = merge_schemas(existing_field.dataType.elementType, field.dataType.elementType)
merged_fields[merged_fields.index(existing_field)] = StructField(field.name, ArrayType(merged_element_type, field.dataType.containsNull), field.nullable)
return merged_fields
return StructType(merge_fields(schema1.fields, schema2.fields))
def process_microbatch(batch_df, batch_id):
try:
# Apply schema_of_json_agg on data columns
incomming_schema_string = batch_df.selectExpr(f"schema_of_json_agg({jsonColName}) as schema").first()["schema"]
existing_schema_string = spark.table(schemaTable).where(f"table_name = '{tableName}'").first()["schema"]
# Create incoming and existing schemas
incomming_schema = StructType.fromDDL(incomming_schema_string)
existing_schema = StructType.fromDDL(existing_schema_string)
# Merge schemas
merged_Schema = merge_schemas(incomming_schema, existing_schema)
# Convert merged schema to DDL string
merged_schema_string = spark._jvm.org.apache.spark.sql.types.DataType.fromJson(merged_Schema.json()).toDDL()
# Create or update the schema in the table
schema_df = spark.createDataFrame([(tableName, merged_schema_string)], ["table_name", "schema"])
schema_df.write.partitionBy("table_name").mode("overwrite").saveAsTable(schemaTable)
Another common characteristic of the JSON data is that child nodes extracted from array elements often lack primary keys. Making incremental updates to data is challenging when there are no primary keys to join on. A typical example of this would be a customer order that has an array of order line items. Without a primary key, it is difficult to know if an order update has new, updated, or deleted items. To address this, a custom CDF generation mechanism is used using Spark Structured Streaming that tracks changes by creating primary keys derived from a combination of the parent ID (ORDERID) and a hash ID (All columns of the child). These primary keys enable incremental updates to the child tables. The child tables need to be joined together in the silver layer to aggregate the data and satisfy business rules.
def process_batch(batch_df, batch_id):
try:
bronze_column_list = batch_df.columns
bronze_df = batch_df.sparkSession.read.table(bronzeTable)
# hashColumn is generated by using xxhash64() on all columns
joinColumns = keyColumnList + [hashColumn]
delete_df = batch_df.filter(F.col(opcodeColumn) == opcodeDeleteKeyword)
# Perform the left-anti join to find records that are soupposed to be inserted
insert_df = (batch_df.filter(F.col(opcodeColumn) != opcodeDeleteKeyword)
.alias("left")
.join(bronze_df.alias("right"),joinColumns,"leftanti")
.withColumn(opcodeColumn, F.lit("init")
)
keyConditions = [F.col(f"tgt.{col}") == F.col(f"chg.{col}") for col in keyColumnList]
hashCondition = F.col(f"tgt.{hashColumn}") != F.col(f"chg.{hashColumn}")
joinCondition = reduce(lambda x, y: x & y, keyConditions + [hashCondition])
non_identical_records_df = bronze_df.join(batch_df, joinColumns, "leftanti")
upsert_df = insert_df.unionByName(non_identical_records_df)
# Write the result to output table
(upsert_df.withColumn("cdc_up_at",F.current_timestamp()).write.format("delta")
.saveAsTable(cdfTable))
(delete_df.withColumn("cdc_up_at",F.current_timestamp()).write.format("delta")
.saveAsTable(cdfTable))
# Start the streaming query
query = spark.readStream.table(bronzeRawTable).writeStream \
.foreachBatch(process_batch) \
.queryName("microBatchProcessing")\
.option("checkpointLocation", f"{checkpointLocation}/v=v{checkpointVersion}/")\
.outputMode("append") \
.trigger(availableNow=True) \
.start()
Silver tables derived from complex joins and views don't naturally support CDC. This means multiple tables have been updated with changes, and it is not clear which changes should be propagated. To handle the CDC, we generate a specialized notebook that generates the CDF using the hash ID. The notebook is parameterized to work with any silver view and join conditions.
The entire system was wrapped in CI/CD pipelines following developer best practices for deployment and management. Databricks Python SDK, Databricks Connect, and GitHub Actions were used in the CI/CD pipelines. We will explore the CI/CD portion in the second part of this blog. If you would like to jump ahead, the code for CI/CD pipelines is in the GitHub repo.
DLT-Meta can be used when your entire pipeline can be implemented in DLT. But sometimes the incoming data needs pre-processing before being consumed by DLT. When this pre-processing is needed, we have to use Spark Streaming. That is when we have to use this interwoven framework
Consider the Configuration-Driven Framework when:
Conclusion
By combining the declarative power of Databricks Delta Live Tables with the flexibility of Spark Streaming and a sophisticated configuration-driven approach, we created a highly adaptable data processing framework capable of handling complex semi-structured data with evolving schemas.
The four-table configuration framework provides a clean separation of concerns, enabling the dynamic handling of everything from schema evolution to change data capture for nested structures. This approach demonstrates how modern data engineering can leverage metadata-driven design to create systems that adapt to changing business needs without constant code rewrites.
For organizations dealing with complex and evolving data structures, this pattern provides a blueprint for building flexible and maintainable data pipelines that can grow in line with your data needs.
Call to action
The CF code using example data can be found on GitHub repo - https://github.com/databricks-solutions/databricks-blogposts/tree/main/2025-04-lakeflow-config-drive.... Please clone the git repository and start building your own config-driven frameworks.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.