As data engineering and analytics become increasingly complex, organizations often seek to integrate the scalability and flexibility of the cloud with the robustness of traditional relational databases. The Databricks Lakehouse, with its recent introduction of the Lakeflow Connect, seamlessly bridges these two worlds, allowing teams to harness the full power of Azure SQL while leveraging Databricks advanced capabilities for big data and AI workloads.
In this post, we’ll explore the setup, features, and advantages of the Databricks Lakeflow Microsoft SQL Server SQL Connector, which enables seamless data ingestion from SQL server databases into Databricks.
What is LakeFlow Connect?
Databricks LakeFlow Connect is a built-in set of ingestion connectors for enterprise applications and databases. The entire ingestion pipeline is governed by Unity Catalog and is powered by serverless compute and Delta Live Tables.
LakeFlow Connect leverages efficient incremental reads to make data ingestion faster, scalable and more cost-efficient while maintaining data freshness for downstream consumption.
Types Of Connectors
Supported SaaS applications:
Database connectors
A database connector is modeled by the following components
- Connection: A Unity Catalog object that stores auth details for the database.
- Gateway: Extracts data from the source database using DLT pipeline with classic compute.
- Staging storage: A Unity Catalog volume where data from the gateway is staged before being applied to a Delta table.
- Ingestion pipeline: A DLT serverless pipeline to ingest the staged data into the Delta tables.
Lakeflow connect database connector architecture
Supported sources for SQL Server connector
- Azure SQL Database
- Amazon RDS for SQL Server
Configure SQL Server for ingestion
-
Create a SQL Server user for ingestion
The recommended approach is to create a database user solely used for Databricks ingestion.
The user must have the following privileges
- Read access to the following system tables and views:
- sys.databases
- sys.schemas
- sys.tables
- sys.columns
- sys.key_constraints
- sys.foreign_keys
- sys.check_constraints
- sys.default_constraints
- sys.change_tracking_tables
- sys.change_tracking_databases
- sys.objects
- sys.triggers
- Execute permission on the system stored procedures:
- sp_tables
- sp_columns
- sp_columns_100
- sp_pkeys
- sp_statistics
- SELECT on the schemas and tables you want to ingest.
2. Enable change tracking or change data capture (CDC)
Based on the SQL Server version and the presence of the primary key, enable CDC or change tracking.
To use Microsoft CDC, you must have SQL Server 2017 or above, and to use Microsoft change tracking, you must have SQL Server 2012 or above.
Additional read permission should be provided when CDC is enabled.
SELECT privilege on the schema CDC
3. Setup DDL capture and schema evolution
The connector can track the DDL on ingested database objects and apply relevant table schema changes to the destination tables or add new tables in case of full schema replication.
To perform DDL capture, additional database objects need to be set, which are automatically set by providing the permissions to the SQL Server user:
- CREATE PROCEDURE on the database
- CREATE TABLE on the database
- SELECT, EXECUTE, and INSERT on the schema
- ALTER on the database
- ALTER on the schema or on all tables to ingest
Databricks Setup
Prerequisite
- Workspace is Unity Catalog enabled.
- Serverless compute should be enabled for notebooks, workflows, and Delta Live Tables. See Enable serverless compute.
- To create a connection: CREATE CONNECTION on the metastore.
To use an existing connection: USE CONNECTION or ALL PRIVILEGES on the connection.
- USE CATALOG on the target catalog.
- USE SCHEMA, CREATE TABLE, and CREATE VOLUME on an existing schema or CREATE SCHEMA on the target catalog.
- CREATE VOLUME on an existing schema.
Configure Unity Catalog resources
The following Unity Catalog objects have to be created
- Create a SQL Server connection.
- Create a staging catalog and schemas.
Data Ingestion
To ingest data, a gateway and an ingestion pipeline need to be created using databricks-sdk or the UI. The gateway pipeline, as explained earlier, extracts data from the source database using a DLT pipeline with a classic compute into a UC volume location, and the ingestion pipeline is a DLT serverless pipeline that ingests staged data on UC volume into the delta table.
Refer to the sample code below, to configure and create gateway and ingestion pipelines.
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import catalog, jobs, pipelines
w = WorkspaceClient()
# ======================
# Setup
# ======================
# The following function simplifies the replication of multiple tables from the same schema
def replicate_tables_from_db_schema(db_catalog_name, db_schema_name, db_table_names):
return [pipelines.IngestionConfig(
table = pipelines.TableSpec(
source_catalog=db_catalog_name,
source_schema=db_schema_name,
source_table=table_name,
destination_catalog=target_catalog_name,
destination_schema=target_schema_name,
)) for table_name in db_table_names]
# The following function simplifies the replication of an entire DB schemas
def replicate_full_db_schema(db_catalog_name, db_schema_names):
return [pipelines.IngestionConfig(
schema = pipelines.SchemaSpec(
source_catalog=db_catalog_name,
source_schema=db_schema_name,
destination_catalog=target_catalog_name,
destination_schema=target_schema_name,
)) for db_schema_name in db_schema_names]
gateway_cluster_spec = None
# The name of the UC connection with the credentials to access the source database
connection_name = "my_connection"
# The name of the UC catalog and schema to store the replicated tables
target_catalog_name = "main"
target_schema_name = "lakeflow_sqlserver_connector_cdc"
# The name of the UC catalog and schema to store the staging volume with intermediate
# CDC and snapshot data.
# Use the destination catalog/schema by default
stg_catalog_name = target_catalog_name
stg_schema_name = target_schema_name
# The name of the Gateway pipeline to create
gateway_pipeline_name = "cdc_gateway"
# The name of the Ingestion pipeline to create
ingestion_pipeline_name = "cdc_ingestion"
# Construct the complete list of tables to replicate
# IMPORTANT: The letter case of the catalog, schema and table names MUST MATCH EXACTLY the case used in the source database system tables
tables_to_replicate = replicate_full_db_schema("MY_DB", ["MY_DB_SCHEMA"])
# Append tables from additional schemas as needed
# + replicate_tables_from_db_schema("REPLACE_WITH_DBNAME", "REPLACE_WITH_SCHEMA_NAME_2", ["table3", "table4"])
# Customize who gets notified about failures
notifications = [
pipelines.Notifications(
email_recipients = [ w.current_user.me().user_name ],
alerts = [ "on-update-failure", "on-update-fatal-failure", "on-flow-failure"]
)
]
# Create a gateway pipeline
# determine the connection id
connection_id = w.connections.get(connection_name).connection_id
gateway_def = pipelines.IngestionGatewayPipelineDefinition(
connection_id=connection_id,
gateway_storage_catalog=stg_catalog_name,
gateway_storage_schema=stg_schema_name,
gateway_storage_name = gateway_pipeline_name)
p = w.pipelines.create(
name = gateway_pipeline_name,
gateway_definition=gateway_def,
notifications=notifications,
clusters= [ gateway_cluster_spec.as_dict() ] if None != gateway_cluster_spec else None
)
gateway_pipeline_id = p.pipeline_id
print(f"Gateway pipeline {gateway_pipeline_name} created: {gateway_pipeline_id}")
# Create an ingestion pipeline
ingestion_def = pipelines.ManagedIngestionPipelineDefinition(
ingestion_gateway_id=gateway_pipeline_id,
objects=tables_to_replicate,
)
p = w.pipelines.create(
name = ingestion_pipeline_name,
ingestion_definition=ingestion_def,
notifications=notifications,
serverless=True,
photon=True,
continuous=False,
)
ingestion_pipeline_id = p.pipeline_id
print(f"Ingestion pipeline {ingestion_pipeline_name} created: {ingestion_pipeline_id}")
|
Code 1. Gateway and ingestion pipeline creation
Gateway and Ingestion pipelines are created as shown below.
Gateway and ingestion pipelines
A sample run of the ingestion pipeline.
Ingestion pipeline sample run
Make additional changes to the source SQL server table.
Updates and deletes in the source table
Trigger a run of the ingestion pipeline, after it completes the updated data can be viewed on the staging delta table.
The staging table was updated with the latest data after running the Ingestion pipeline
Key advantages of using Lakeflow Connect
Managed Setup
No additional resources need to be provisioned for the ingestion of data from the SQL server to Databricks.
Secure Connectivity
The Azure SQL connector utilizes transport layer security (TLS) to establish a secure connection between Databricks and your SQL database. This ensures that data transfer occurs over an encrypted channel, protecting sensitive information during transit.
Credential Management with Unity Catalog
Credentials for connecting to Azure SQL are stored securely within the Unity Catalog. This centralized approach to credential management offers several benefits:
- Enhanced security through access controls
- Simplified credential rotation and management
- Auditability of credential usage
Only users with appropriate permissions can retrieve and use these credentials when running ingestion flows.
Scalable Ingestion Pipeline
The connector leverages Databricks DLT capabilities to create a scalable ingestion pipeline. This allows you to efficiently handle large volumes of data with essential features such as progress tracking, error logging and alerting.
- Failures in pipelines are gracefully handled without loss of data.
- Currently, there is no additional cost for LakeFlow Connect. Customers are only billed for the Delta Live Tables usage needed to load data from the source to the staging volume and from the staging volume to the staging table.
Limitations
- Schema evolution, like dropping a column, is not supported; a full table refresh is required to capture the schema updates.
- Deleting the pipeline will delete the stage volume and the stage tables.
- When a source table is deleted, the destination table is automatically deleted.
- Reverse ETL is not supported.
- The gateway must run in Classic mode.
- The ingestion pipeline must run in Serverless mode.
- Only triggered mode for running ingestion pipelines is supported.
- The ingestion pipeline supports only one destination catalog and schema. To write to multiple destination catalogs or schemas, create multiple gateway-ingestion pipeline pairs.
- When you run a scheduled pipeline, alerts don’t trigger immediately. Instead, they trigger when the next update runs.
Conclusion
The Databricks LakeFlow Azure SQL connector offers a robust set of features designed to simplify and secure your data ingestion process without the need to set up any additional cloud infrastructure. By leveraging this connector, organizations can efficiently bring their data from SaaS applications or other databases into the Databricks environment, enabling advanced analytics, machine learning, and business intelligence use cases on a unified platform.