Every data warehousing project once defined and deemed necessary contains three primary tasks:
Before Databricks, developers would need a separate tool for each function and have to maintain them adding a lot of complexity.
The purpose of this quick blog is to demonstrate Databrick's new Lakeflow Connect capabilities combined with Delta Live Tables (DLT) to perform these three requirements for data warehousing all in one combined package using Microsoft’s SQL Server as the source database.
When working with large data sets, constantly processing sources from scratch tends to be an expensive and time consuming affair.
Many “poor mans” Change Data Capture (CDC) implementations, such as “Last Update Date” or other date filters or difference operations are often performed due to a lack of control on the source database. Beyond the time and expense, there are also deficiencies in capturing deletes in the source data, requiring additional operations to determine those transactions.
When the stars align and developers are provided the capability to leverage Change Data Capture in the source database, you would need a tool that can call the database procedure to retrieve the changed records and process them forward in the ETL.
Databricks now has Lakeflow Connect. With it, you can ingest the CDC records. You still, however, have to enable the CDC components on the source Database. For that you need to follow the provided steps:
With the database user you created for the CDC replication, you will need to create a connection in Databricks so the replication pipeline can connect to the database and perform the ingestion/replication. In this blog, we focus on connecting to a Azure SQL Server instance, though we also support an on prem SQL Server instance with the correct Network configurations (Express Route, etc.)
The steps are getting easier now that we are purely in the UI on Databricks. The following steps are in this guide but are presented here in reprise.
Note that the UI combines the connection and the Unity Catalog’s catalog creation steps.
Provide your connection and authentication information and continue on.
In the “Connection details” step, pick the appropriate certificate option for your scenario, select “Read only” for Application intent and create the connection.
Provide a catalog name to be used in Unity Catalog, and provide the sql server database name that will be mapped to our new catalog.Finally, click on “Create catalog”. Note that this catalog is a Foreign Catalog and will not be the same catalog that you use for pipeline operations later.
Specify the appropriate access and sharing parameters for this catalog by choosing workspaces that can use the catalog, an owner, and granting the appropriate access privileges.
Finally, add the necessary tags required by your organization or your benefit in the metadata for the connection.
At this point, the catalog will be created and you can use the Catalog explorer to view and query the SQL Server database using federated queries through Lakehouse Federation.
The Lakeflow Connect replication workflow is called an “ingestion pipeline” consisting of two separate components, the gateway and the ingest pipeline.
You can create the gateway and ingest pipelines using:
These are covered in the documentation but we’ll show the Databricks UI approach here for convenience. As always, check the documentation for the latest directions and updates.
Before creating the ingestion pipeline, you will need to select or create catalogs and schemas to store the staging data from the gateway pipeline and the destination tables created by the ingest pipeline. These are referred to as the “staging catalog” and the “destination catalog” in subsequent steps.
The staging and destination catalogs can be the same catalog and share the same schema which is the approach shown here, but you can also create and use separate catalogs and/or schemas. The gateway stores the source snapshot and change data in a volume so there is no naming collision when using the same catalog and schema for both the staging catalog and the destination catalog.
Note that the catalog created during the connection setup cannot be used as that is a foreign catalog for query federation, and the ingestion requires a managed catalog.
From the catalog explorer > external data > connection page, use the “Create ingestion pipeline” button.
Enter a name for your ingestion gateway as well as the staging catalog and schema and continue to the next step.
The second pipeline, the ingestion pipeline (part of the overall ingestion/replication process) requires a unique name as well, a connection, and also a destination catalog. You can use the same catalog as the staging data (shown here) or select a different catalog. Continue on to select the source tables.
After a few moments, the source database, database schema, and tables (per your connection user’s permissions) will be displayed. Select the schema (shown here) or specific tables for ingestion. Selecting the schema will bring in all tables in that schema.
The destination schema is chosen next. Continue to choose your schedule and notification settings for this pipeline.
Setup your schedule and notification preferences.
Save and run the pipeline, or save and just close. Saving and running will launch both the gateway and ingest pipelines.
After a short time, this pipeline completed successfully and displayed the results and metrics from this run. Checking the destination catalog and schema shows the two newly created tables.
We are following the Databricks Medallion architecture, with a bronze layer holding the raw data from our source system, and a silver layer containing the Slowly Changing Dimension Type II (SCD Type II) change tracking of the history of our changing business entities. We are not using a gold layer in this blog as our intent is showing CDC and SCD Type II.
Native SCD Type II is on the roadmap for Databricks Lakeflow CDC. In the meantime, we will accomplish this using a pipeline that employs the “apply changes from snapshot” capability. There are different options available for apply_changes_from_snapshot but we are only using a few in this example.
This pipeline reads from the replicated customer table and applies the changes from the last version into a new SCD Type II table. Notice that in addition to the original columns, our SCD table now has ‘__START_AT’ and ‘__END_AT’ columns used to determine the ‘current’ value.
This is displaying the appropriate value along with the __START_AT and __END_AT values.
In summary, I expect that you will find that this blog demonstrates Databricks' ability to handle data warehousing tasks, including data ingestion, transformation, and maintaining version history, SCD Type II, all within one platform. It covers setting up Change Data Capture on a source database, creating connections and replication pipelines, and implementing SCD Type II using DLT. The example shows how changes in source data are captured and reflected in the destination tables, including historical versions of records. Previously, developers were forced to use many tools to accomplish the same type of end-to-end tasks, but with Databricks it is accomplished all in one Open, Unified and Secure platform.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.