cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

DELTA LIVE TABLE -Parallel processing

JUMAN4422
New Contributor III

how can we process multiple tables within a delta live table pipeline parallelly as table names as parameters.

7 REPLIES 7

Alberto_Umana
Databricks Employee
Databricks Employee

To process multiple tables within a Delta Live Table (DLT) pipeline in parallel using table names as parameters, you can leverage the flexibility of the DLT Python API. Hereโ€™s a step-by-step guide on how to achieve this:

 

  1. Define the Tables Dynamically:

    Use the @Dlt.table decorator to define your tables. You can create a function that takes table names as parameters and dynamically generates the required tables.1
  2. Use the dlt.read or spark.read.table Functions:

    These functions allow you to read from other tables within the same pipeline. Use the LIVE keyword to reference tables defined in the same pipeline.1
  3. Parallel Processing: While DLT manages the orchestration of tasks, you can define multiple tables in your pipeline, and DLT will handle their dependencies and execution order. Ensure that your tables are defined in a way that allows DLT to infer the dependencies correctly.

 

Hereโ€™s an example of how you can define multiple tables dynamically:

 

import dlt

from pyspark.sql.functions import col

 

# Function to create a table

def create_table(table_name):

    @Dlt.table(name=table_name)

    def table_def():

        return spark.read.table(f"source_database.{table_name}")

 

# List of table names to process

table_names = ["table1", "table2", "table3"]

 

# Create tables dynamically

for table_name in table_names:

    create_table(table_name)

if we use for loop to pass table names, it will be handled one by one, right?
if yes, can you suggest any other methods like I need to process 'n' number of tables at a time .

JUMAN4422
New Contributor III

can we run a dlt pipeline multiple time at the same time using different parameters using rest api call with asyncio.

i have created a function to start the pipeline using rest api.
when calling the function with asyncio , i am getting [409 Conflict]> error.



ChantellevdWalt
New Contributor II

@Alberto_Umana where you're ingesting the list "table_names = ["table1", "table2", "table3"]", can I replace this with the row values from a DLT view? 
When I've tried using the @dlt.view, I run into the error that I need to iterate within the confines of a dlt structure and if I use the rows from a @dlt.table then I run into a "table not found" error which I think is a limitation on how DLT sets up the DAG/relationships before actual processing?

swatkat
Visitor

@JUMAN4422 , if you have found any solution on this, please post

JUMAN4422
New Contributor III

 

you can use for loop, it will process parallelly based on the cluster size.
Define the dlt logic in a function,

def dlt_logic(table_name):

     ........

 

then pass your table names in a list to the function.

 

table_names = ["table1", "table2", "table3"]

for table_name in table_names:

    dlt_logic(table_name)

 

iyashk-DB
Databricks Employee
Databricks Employee

DLT analyzes your code to build a dependency graph (DAG) and schedules independent flows concurrently up to the available compute; you donโ€™t have to orchestrate parallelism yourself if flows donโ€™t depend on each other.

parameterise a list of table names and generate perโ€‘table flows (Python)

Use a pipeline configuration parameter (for example, table_list) and read it from your notebook. Then, create DLT tables in a loop using a small function factory so each table gets its own definition, which DLT will parallelize when theyโ€™re independent.

# Python (DLT)
import dlt
from pyspark.sql.functions import *

# 1) Read list of tables from pipeline parameter "table_list", e.g., "customers,orders,products"
tables = [t.strip() for t in spark.conf.get("table_list").split(",")]

# 2) Use a function factory to avoid late-binding issues in loops
def define_bronze(name: str):
    @dlt.table(name=f"{name}_bronze", comment=f"Bronze ingestion for {name}")
    def _bronze():
        # Example: Auto Loader per-table path; adapt format/path/options to your sources
        return (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("inferSchema", True)
            .load(f"/mnt/data/{name}")  # e.g., one folder per table name
        )
    return _bronze

def define_silver(name: str):
    @dlt.table(name=f"{name}_silver", comment=f"Silver cleansing for {name}")
    def _silver():
        # Example transformation; replace with your logic
        return dlt.read_stream(f"{name}_bronze").select("*")
    return _silver

# 3) Instantiate a bronze+silver flow for each table name
for n in tables:
    define_bronze(n)
    define_silver(n)

Because DLT evaluates decorators lazily, you must create datasets inside separate functions when looping; otherwise, youโ€™ll accidentally capture the last loop variable value for all tables.