cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
Cary
Databricks Employee
Databricks Employee

 

Introduction

Every data warehousing project once defined and deemed necessary contains three primary tasks:

  1. Ingest data from source.
  2. Populate and transform data for reporting
  3. Leverage Slowly Changing Dimensions Type II methodology (maintain history) 

Before Databricks, developers would need a separate tool for each function and have to maintain them adding a lot of complexity.  

The purpose of this quick blog is to demonstrate Databrick's new Lakeflow Connect capabilities combined with Delta Live Tables (DLT) to perform these three requirements for data warehousing all in one combined package using Microsoft’s SQL Server as the source database.   

Setting up the Source Database for CDC

When working with large data sets, constantly processing sources from scratch tends to be an expensive and time consuming affair.  

Many “poor mans” Change Data Capture (CDC) implementations, such as “Last Update Date” or other date filters or difference operations are often performed due to a lack of control on the source database. Beyond the time and expense, there are also deficiencies in capturing deletes in the source data, requiring additional operations to determine those transactions.

 When the stars align and developers are provided the capability to leverage Change Data Capture in the source database, you would need a tool that can call the database procedure to retrieve the changed records and process them forward in the ETL.

Databricks now has Lakeflow Connect.  With it, you can ingest the CDC records. You still, however, have to enable the CDC components on the source Database. For that you need to follow the provided steps:

  • Create a user in the source database that has privileges to read and execute the necessary procedures and change logs. Those requirements are specified in this documentation for SQL Server.
  • You will need to decide whether to use Change Data Capture, or Change Tracking.  You can use the documentation to assist in making the decision: Databricks requires either Microsoft change tracking or Microsoft change data capture (CDC) to extract data from SQL Server.
  • Change tracking captures the fact that rows in a table have changed, but doesn’t capture the actual operations. Enable change tracking for the tables.
  • Change data capture captures every operation on a table.  Enable the built-in CDC.

Creating a connection for the source database

With the database user you created for the CDC replication, you will need to create a connection in Databricks so the replication pipeline can connect to the database and perform the ingestion/replication.  In this blog, we focus on connecting to a Azure SQL Server instance, though we also support an on prem SQL Server instance with the correct Network configurations (Express Route, etc.)

The steps are getting easier now that we are purely in the UI on Databricks. The following steps are in this guide but are presented here in reprise.

Note that the UI combines the connection and the Unity Catalog’s catalog creation steps.

  1. Using the Databricks workspace UI, create an external connection for the SQL Server database via the steps from Catalog > External Data > Connections location.
  2. Provide a connection name and select the “SQL Server” connection type. This will open up additional steps. Select the “Username and password” Auth type (Databricks also supports OAuth). Optionally, add a comment, then click “Next”.

Cary_0-1747239194502.png

Provide your connection and authentication information and continue on.

Cary_1-1747239194525.png

In the “Connection details” step, pick the appropriate certificate option for your scenario, select “Read only” for Application intent and create the connection.

Cary_2-1747239194522.png

Provide a catalog name to be used in Unity Catalog, and provide the sql server database name that will be mapped to our new catalog.Finally, click on “Create catalog”. Note that this catalog is a Foreign Catalog and will not be the same catalog that you use for pipeline operations later. 

Cary_3-1747239194523.png

Specify the appropriate access and sharing parameters for this catalog by choosing workspaces that can use the catalog, an owner, and granting the appropriate access privileges.

Cary_4-1747239194519.png

Finally, add the necessary tags required by your organization or your benefit in the metadata for the connection.

Cary_5-1747239194432.png

At this point, the catalog will be created and you can use the Catalog explorer to view and query the SQL Server database using federated queries through Lakehouse Federation.

Creating the Replication Pipeline

The Lakeflow Connect replication workflow is called an “ingestion pipeline” consisting of two separate components, the gateway and the ingest pipeline.

  1. The gateway pipeline: creates a staging volume, connects to your SQL Server database, extracts a snapshot and change data, and stores it in the staging volume. The gateway stores an initial seed snapshot followed by change data. It's recommended to run the gateway as a continuous pipeline to avoid gaps in change data due to change log retention policies on the source database.
  2. The ingest pipeline: applies the initial snapshot and subsequent change data from the gateway pipeline (stored in the staging volume) into destination streaming tables. It's important to note that only one ingestion pipeline per gateway is supported. If you need to write to multiple destination catalogs or schemas, you'll need to create multiple gateway-ingestion pipeline pairs.

You can create the gateway and ingest pipelines using:

  1. Databricks User Interface (UI)
  2. Databricks Command Line Interface (CLI)
  3. Databricks Asset Bundle (DAB)
  4. A notebook

These are covered in the documentation but we’ll show the Databricks UI approach here for convenience. As always, check the documentation for the latest directions and updates.

Prerequisites

Before creating the ingestion pipeline, you will need to select or create catalogs and schemas to store the staging data from the gateway pipeline and the destination tables created by the ingest pipeline. These are referred to as the “staging catalog” and the “destination catalog” in subsequent steps.

The staging and destination catalogs can be the same catalog and share the same schema which is the approach shown here, but you can also create and use separate catalogs and/or schemas. The gateway stores the source snapshot and change data in a volume so there is no naming collision when using the same catalog and schema for both the staging catalog and the destination catalog.

Note that the catalog created during the connection setup cannot be used as that is a foreign catalog for query federation, and the ingestion requires a managed catalog. 

Create The Ingestion Pipeline

From the catalog explorer > external data > connection page, use the “Create ingestion pipeline” button.

Cary_6-1747239194491.png

Enter a name for your ingestion gateway as well as the staging catalog and schema and continue to the next step.

Cary_7-1747239194461.png

The second pipeline, the ingestion pipeline (part of the overall ingestion/replication process) requires a unique name as well, a connection, and also a destination catalog. You can use the same catalog as the staging data (shown here) or select a different catalog. Continue on to select the source tables.

Cary_8-1747239194487.png

After a few moments, the source database, database schema, and tables (per your connection user’s permissions) will be displayed. Select the schema (shown here) or specific tables for ingestion. Selecting the schema will bring in all tables in that schema.

Cary_9-1747239194504.png

The destination schema is chosen next. Continue to choose your schedule and notification settings for this pipeline.

Cary_10-1747239194487.png

Setup your schedule and notification preferences.

Cary_11-1747239194465.png

Save and run the pipeline, or save and just close. Saving and running will launch both the gateway and ingest pipelines.

Cary_12-1747239194503.png

After a short time, this pipeline completed successfully and displayed the results and metrics from this run. Checking the destination catalog and schema shows the two newly created tables.

Cary_13-1747239194372.png

Creating the DLT Pipeline

We are following the Databricks Medallion architecture, with a bronze layer holding the raw data from our source system, and a silver layer containing the Slowly Changing Dimension Type II (SCD Type II) change tracking of the history of our changing business entities. We are not using a gold layer in this blog as our intent is showing CDC and SCD Type II.

  1. Bronze - The bronze layer traditionally holds the data as it exists in the source system. In this case, this is the data from SQL Server created in step 4. These are the replicated tables, customer and transaction.
  2. Silver - This layer resembles the source system, containing the cleansed and conformed data including aggregations and SCD2 changes. The data in this blog scenario is only lightly changed containing our bronze data plus our SCD2 changes. 

Native SCD Type II is on the roadmap for Databricks Lakeflow CDC. In the meantime, we will accomplish this using a pipeline that employs the “apply changes from snapshot” capability. There are different options available for apply_changes_from_snapshot but we are only using a few in this example.

Cary_14-1747239194434.png

This pipeline reads from the replicated customer table and applies the changes from the last version into a new SCD Type II table. Notice that in addition to the original columns, our SCD table now has ‘__START_AT’ and ‘__END_AT’ columns used to determine the ‘current’ value.

Cary_15-1747239194428.png

Querying the tables.

    1. Now to demonstrate the SCD Type II  in action, we update our customer record to change the customer’s email address.
    2. Displaying the email address for our fictitious customer shows amy26@example.net
    3. After updating this email in the source database to amy52@example.net and running the ingest and SCD Type II pipeline gives:

Cary_16-1747239194471.png

This is displaying the appropriate value along with the __START_AT and __END_AT values.

 Conclusion

In summary, I expect that you will find that this blog demonstrates Databricks' ability to handle data warehousing tasks, including data ingestion, transformation, and maintaining version history, SCD Type II, all within one platform. It covers setting up Change Data Capture on a source database, creating connections and replication pipelines, and implementing SCD Type II using DLT. The example shows how changes in source data are captured and reflected in the destination tables, including historical versions of records.  Previously, developers were forced to use many tools to accomplish the same type of end-to-end tasks, but with Databricks it is accomplished all in one Open, Unified and Secure platform.