@Stentone
You're right that the dependencies parameter isn't available for the dlt.table decorator - that was my mistake. Let's focus on approaches that will actually work in your environment.
1. Try View-Based Approach
Instead of using FK constraints directly, create the tables without constraints and use views to establish the dependencies:
# Define tables without FK constraints
@dlt.table(name="user")
def user():
return spark.table("source_users")
@dlt.table(name="orders")
def orders():
return spark.table("source_orders")
# Create a view that references both tables to establish dependency
@dlt.view(name="orders_with_users")
def orders_with_users():
return spark.sql("""
SELECT o.*, u.*
FROM LIVE.orders o
LEFT JOIN LIVE.user u ON o.user_id = u.id
""")
This ensures both tables are created before any attempt to join them.
2. Explicit Table Creation Order
Use expectations to control order of execution:
# First create the referenced table
@dlt.table(name="user")
def user():
return spark.table("source_users")
# Then create the referring table with an expectation that checks if the referenced table exists
@dlt.table(name="orders")
@dlt.expect_or_fail("referenced_table_exists", "1 = 1") # Always true but forces dependency check
def orders():
# Check if referenced table exists
user_exists = spark.sql("SELECT COUNT(*) > 0 FROM LIVE.user LIMIT 1").collect()[0][0]
if not user_exists:
raise Exception("Referenced table 'user' does not exist")
return spark.table("source_orders")
3. Two-Phase Approach
Split your pipeline into two phases:
# Phase 1: Create all tables without FK constraints
@dlt.table(name="user")
def user():
return spark.table("source_users")
@dlt.table(name="orders")
def orders():
return spark.table("source_orders")
# Phase 2: Add FK constraints via a metadata table that depends on all others
@dlt.table(name="metadata_updates")
def add_metadata():
# First verify all tables exist
tables_exist = spark.sql("""
SELECT
EXISTS (SELECT 1 FROM LIVE.user LIMIT 1) AND
EXISTS (SELECT 1 FROM LIVE.orders LIMIT 1) AS all_exist
""").collect()[0][0]
if tables_exist:
# Then apply FK constraints via SQL
try:
spark.sql("""
ALTER TABLE LIVE.orders
ADD CONSTRAINT fk_user_id
FOREIGN KEY (user_id) REFERENCES LIVE.user(id) NOT ENFORCED
""")
except Exception as e:
# Log but don't fail
print(f"Failed to add FK constraint: {str(e)}")
# Return a dummy dataframe so the table has data
return spark.createDataFrame([[1]], ["metadata_applied"])
4. Post-Pipeline Job
If none of the above works, consider a two-step process:
Run your DLT pipeline without FK constraints
Have a separate notebook job that runs after the DLT pipeline completes, which adds the constraints:
# In a separate notebook that runs after DLT
spark.sql("""
ALTER TABLE sample_catalog.sales_gold.orders
ADD CONSTRAINT fk_user_id
FOREIGN KEY (user_id) REFERENCES sample_catalog.sales_gold.user(id) NOT ENFORCED
""")
5. Use Comments Instead of FK Constraints
If the goal is documentation but not necessarily enforcing relationship:
@dlt.table(
name="orders",
comment="Orders table with FK: user_id references user(id)"
)
def orders():
return spark.table("source_orders")
Given the issues you're experiencing, you might need to consider if the ERD feature is worth the trouble right now. The underlying problem appears to be that DLT is checking for the existence of referenced tables before it has created them all, even with NOT ENFORCED constraints.
LR