yesterday
Hi Guys,
We have a DLT pipeline that is reading data from landing to raw (csv files into tables) for approximately 80 tables.
In our first attempt at this we declared each table separately in a python notebook. One @Dlt table declared per cell.
Then when another database came along with 300 tables we looked for a better solution and found a way to dynamically declare the dlt tables using a loop and a table with the table names we want to declare. This works good and now there is no repetition of code like before. However I discovered a trade off I hope we can get around.
Using the first method where the tables are statically declared on each line the INITAILIZATION and SETTING UP TABLES stages are taking only 4 minutes together. But when we use the dynamic declaration method its now taking 30 to 35 minutes for those same stages.
Has anyone else who is using DLT dynamic table declaration encountered this big jump in run-time using this style? A 30 minute jump in run time seems excessive to me just to produce the dlt declarations.
Again, thanks for any suggestions or help
6 hours ago
Hello , @eballinger
Table Metadata Initialization Overhead
When dynamically declaring tables, your loop might be causing additional overhead by reinitializing metadata or creating resources redundantly.
Suggestions:
Ensure that the loop only processes the essential metadata and avoids redundant operations.
Use caching mechanisms for metadata if applicable, to avoid fetching the same information multiple times.
2. Parallel Execution
In the static approach, each table is declared independently and likely allows for better parallel execution. A dynamic loop may serialize operations or block parallelism.
Suggestions:
Use multi-threading or asynchronous execution to declare tables in parallel, especially if the DLT framework supports parallel processing.
Group tables logically and batch their creation.
3. Code Complexity in the Loop
The logic inside the dynamic loop could be more computationally expensive than expected, such as repeated operations, large data manipulations, or complex branching.
Suggestions:
Profile your loop logic to identify bottlenecks (e.g., using Python's cProfile or timeit modules).
Simplify the logic, removing unnecessary operations.
4. Validation and Dependency Checks
DLT may validate each dynamically declared table and check dependencies, which could take longer dynamically compared to static declarations where such checks may be cached.
Suggestions:
Check if DLT provides configuration options to limit validation overhead for dynamic declarations.
If you’re not using all tables, filter the table list to reduce unnecessary declarations.
5. Metadata Table Lookup
If your dynamic approach depends on querying a metadata table or schema, delays may occur due to inefficient database queries.
Suggestions:
Optimize database queries for fetching table metadata (e.g., indexes, caching).
Pre-fetch the metadata into memory if feasible and iterate over it locally.
6. Initialization in DLT
DLT may optimize differently between static and dynamic methods, such as precomputing dependencies for static declarations.
Suggestions:
Review DLT-specific documentation for optimizations when using dynamic declarations.
Check if there are recommended practices for declaring large numbers of tables dynamically.
7. Use Partitioned Runs
If tables can be grouped into logical partitions, split the pipeline initialization into smaller chunks. For instance, initialize 50 tables at a time in separate runs and monitor performance.
Example Optimized Dynamic Declaration:
python
Copy code
import dlt
# Pre-fetch metadata for tables
table_metadata = get_table_metadata() # Replace with your function to fetch metadata
@dlt.pipeline(name="my_pipeline", storage_path="/path/to/storage")
def my_pipeline():
for table in table_metadata:
@dlt.table(
name=table['name'],
schema=table['schema'],
primary_key=table['primary_key'],
)
def load_table():
return read_csv(table['path']) # Replace with your data loading logic
if __name__ == "__main__":
my_pipeline().run()
This ensures minimal overhead by pre-fetching metadata and only declaring what’s necessary.
Next Steps
Profile your dynamic logic to identify bottlenecks.
Implement parallelism where possible.
Optimize metadata fetching and DLT initialization configurations.
Best Regards
6 hours ago
Hello @eballinger , thank you for your question. To better assist you, could you clarify a few details?
The increased runtime with dynamic declarations is likely due to the overhead of processing each table dynamically, compared to the static approach where these computations are predefined. To address this:
If these don’t resolve the issue, let me know more specifics, and I’ll provide further suggestions!
7m ago
Thanks for the excellent suggestions VZLA. Here is a copy of the dynamic code just FYI:
Event Logs:
2015-01-08 15:18:17 EST User xxx started an update
2015-01-08 15:18:17 EST Update 15d539 started by API call
2015-01-08 15:18:18 EST Update 15d539 is INITIALIZING
2015-01-08 15:38:31 EST Update 15d539 is SETTING_UP_TABLES
2015-01-08 15:48:45 EST Flow <table_name> is defined as APPEND
...
2015-01-08 15:49:59 EST Update 15d539 is COMPLETED
---------------------------------------------------
def create_table(name):
@Dlt.table(name=name,
comment="Raw Data Tables",
table_properties={
"quality":"bronze"
})
def t():
# Define schema for the table based on the table schema reference
schema = StructType([StructField(col_name, getattr(pyspark.sql.types, type_fun)(), True) for (col_name, type_fun) in table_schema.where((col('table_name')==name)|(col('table_name')=='all_tables')).orderBy(col('column_id')).select('col_name','type_fun').collect()])
# Adjust DecimalType fields with precision and scale
#for field in schema:
# ## Check on the decimal columns and update them. Some are set as precision of 0 but that wouldn't make any sense, so we're defaulting those ones to 25 to be safe
# if isinstance(field.dataType, DecimalType):
# precision_scale = table_schema.where((col('table_name') == name) & (col('col_name') == field.name)).select('data_precision', 'data_scale').collect()[0]
# if precision_scale['data_precision'] == '0':
# field.dataType = DecimalType(precision=int("25"), scale=int(precision_scale['data_scale']))
# else:
# field.dataType = DecimalType(precision=int(precision_scale['data_precision']), scale=int(precision_scale['data_scale']))
# Read CSV data into the table
return readCSV(schema, name)
# Loop through each table name and create the corresponding DLT table
for table_name in table_list:
create_table(table_name)
-----------------------------------------------
I just found the issue though. Its the new code added above (Adjust Decimal Type section) that's causing the delay and is not part of the original static declaration code. So I wasn't exactly comparing apples to apples. When I comment out that part this dynamic code runs in 45 seconds. So I will dig further into that code now and see if I can improve that part (which is the true bottleneck now).
Thanks again
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group