โ12-11-2024 11:09 PM
how can we process multiple tables within a delta live table pipeline parallelly as table names as parameters.
โ12-12-2024 04:54 AM
To process multiple tables within a Delta Live Table (DLT) pipeline in parallel using table names as parameters, you can leverage the flexibility of the DLT Python API. Hereโs a step-by-step guide on how to achieve this:
Hereโs an example of how you can define multiple tables dynamically:
import dlt
from pyspark.sql.functions import col
# Function to create a table
def create_table(table_name):
@Dlt.table(name=table_name)
def table_def():
return spark.read.table(f"source_database.{table_name}")
# List of table names to process
table_names = ["table1", "table2", "table3"]
# Create tables dynamically
for table_name in table_names:
create_table(table_name)
โ12-15-2024 08:49 PM
if we use for loop to pass table names, it will be handled one by one, right?
if yes, can you suggest any other methods like I need to process 'n' number of tables at a time .
โ12-17-2024 11:28 PM
can we run a dlt pipeline multiple time at the same time using different parameters using rest api call with asyncio.
i have created a function to start the pipeline using rest api.
when calling the function with asyncio , i am getting [409 Conflict]> error.
โ02-25-2025 09:33 AM
@Alberto_Umana where you're ingesting the list "table_names = ["table1", "table2", "table3"]", can I replace this with the row values from a DLT view?
When I've tried using the @dlt.view, I run into the error that I need to iterate within the confines of a dlt structure and if I use the rows from a @dlt.table then I run into a "table not found" error which I think is a limitation on how DLT sets up the DAG/relationships before actual processing?
4 hours ago
@JUMAN4422 , if you have found any solution on this, please post
4 hours ago
you can use for loop, it will process parallelly based on the cluster size.
Define the dlt logic in a function,
def dlt_logic(table_name):
........
then pass your table names in a list to the function.
table_names = ["table1", "table2", "table3"]
for table_name in table_names:
dlt_logic(table_name)
26m ago
DLT analyzes your code to build a dependency graph (DAG) and schedules independent flows concurrently up to the available compute; you donโt have to orchestrate parallelism yourself if flows donโt depend on each other.
Use a pipeline configuration parameter (for example, table_list) and read it from your notebook. Then, create DLT tables in a loop using a small function factory so each table gets its own definition, which DLT will parallelize when theyโre independent.
# Python (DLT)
import dlt
from pyspark.sql.functions import *
# 1) Read list of tables from pipeline parameter "table_list", e.g., "customers,orders,products"
tables = [t.strip() for t in spark.conf.get("table_list").split(",")]
# 2) Use a function factory to avoid late-binding issues in loops
def define_bronze(name: str):
@dlt.table(name=f"{name}_bronze", comment=f"Bronze ingestion for {name}")
def _bronze():
# Example: Auto Loader per-table path; adapt format/path/options to your sources
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.load(f"/mnt/data/{name}") # e.g., one folder per table name
)
return _bronze
def define_silver(name: str):
@dlt.table(name=f"{name}_silver", comment=f"Silver cleansing for {name}")
def _silver():
# Example transformation; replace with your logic
return dlt.read_stream(f"{name}_bronze").select("*")
return _silver
# 3) Instantiate a bronze+silver flow for each table name
for n in tables:
define_bronze(n)
define_silver(n)
Because DLT evaluates decorators lazily, you must create datasets inside separate functions when looping; otherwise, youโll accidentally capture the last loop variable value for all tables.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now