cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Parametrize the DLT pipeline for dynamic loading of many tables

databrciks
New Contributor III

I need to load many tables into Bronze layer connecting to sql server DB.

How can i pass the tables names dynamically in DLT. Means one code pass many tables and load into bronze layer

1 ACCEPTED SOLUTION

Accepted Solutions

Ashwin_DSA
Databricks Employee
Databricks Employee

Hi @databrciks,

To make sure I've understood your query... Am I right in saying you want to ingest many SQL Server tables into a Bronze layer using DLT with a single reusable pipeline, where table names are passed dynamically rather than writing separate code for each table?

If so, I would recommend using a metadata-driven DLT pattern... You can put the list of SQL Server tables in pipeline configuration, then loop over that list in your DLT notebook and define one table per entry via a factory function:

Here is a sample.. 

 

import dlt
from pyspark.sql import functions as F

jdbc_url = spark.conf.get("pipeline.jdbc_url")
user = dbutils.secrets.get("sqlserver-scope", "user")
password = dbutils.secrets.get("sqlserver-scope", "password")

connection_props = {
    "user": user,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

SOURCE_TABLES = [
    t.strip()
    for t in spark.conf.get("pipeline.source_tables").split(",")
    if t.strip()
]

def make_bronze_table(src_table: str):
    table_name = src_table.replace(".", "_").lower()   # dbo.Customers -> dbo_customers

    @dlt.table(
        name=f"br_{table_name}",
        comment=f"Bronze copy of {src_table} from SQL Server"
    )
    def _bronze():
        return (
            spark.read.format("jdbc")
            .option("url", jdbc_url)
            .option("dbtable", src_table)
            .options(**connection_props)
            .load()
            .withColumn("_ingest_ts", F.current_timestamp())
        )

for t in SOURCE_TABLES:
    make_bronze_table(t)
 

In the pipeline config you then set, for example:

{
  "configuration": {
    "pipeline.jdbc_url": "jdbc:sqlserver://<server>:1433;databaseName=<db>",
    "pipeline.source_tables": "dbo.Customers,dbo.Orders,dbo.Products"
  }
}
One DLT pipeline will then generate and load all Bronze tables dynamically.
 
Or, were you expecting somethign else?
 

If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

View solution in original post

3 REPLIES 3

szymon_dybczak
Esteemed Contributor III

Hi @databrciks ,

You can use pipeline parameters. Below you'll find some examples:

Use parameters with pipelines - Azure Databricks | Microsoft Learn

Ashwin_DSA
Databricks Employee
Databricks Employee

Hi @databrciks,

To make sure I've understood your query... Am I right in saying you want to ingest many SQL Server tables into a Bronze layer using DLT with a single reusable pipeline, where table names are passed dynamically rather than writing separate code for each table?

If so, I would recommend using a metadata-driven DLT pattern... You can put the list of SQL Server tables in pipeline configuration, then loop over that list in your DLT notebook and define one table per entry via a factory function:

Here is a sample.. 

 

import dlt
from pyspark.sql import functions as F

jdbc_url = spark.conf.get("pipeline.jdbc_url")
user = dbutils.secrets.get("sqlserver-scope", "user")
password = dbutils.secrets.get("sqlserver-scope", "password")

connection_props = {
    "user": user,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

SOURCE_TABLES = [
    t.strip()
    for t in spark.conf.get("pipeline.source_tables").split(",")
    if t.strip()
]

def make_bronze_table(src_table: str):
    table_name = src_table.replace(".", "_").lower()   # dbo.Customers -> dbo_customers

    @dlt.table(
        name=f"br_{table_name}",
        comment=f"Bronze copy of {src_table} from SQL Server"
    )
    def _bronze():
        return (
            spark.read.format("jdbc")
            .option("url", jdbc_url)
            .option("dbtable", src_table)
            .options(**connection_props)
            .load()
            .withColumn("_ingest_ts", F.current_timestamp())
        )

for t in SOURCE_TABLES:
    make_bronze_table(t)
 

In the pipeline config you then set, for example:

{
  "configuration": {
    "pipeline.jdbc_url": "jdbc:sqlserver://<server>:1433;databaseName=<db>",
    "pipeline.source_tables": "dbo.Customers,dbo.Orders,dbo.Products"
  }
}
One DLT pipeline will then generate and load all Bronze tables dynamically.
 
Or, were you expecting somethign else?
 

If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

databrciks
New Contributor III

Hi Ashwin 

Thanks for the quick response. Yes I want to pass all the tables through config parameter/param file and load that into bronze layer

I will try this approach. 

Thanks