Hello, thank you for your question.
Delta Live Tables (DLT) does not currently have built-in support for creating or enforcing foreign key constraints within a pipeline. This is because DLT is designed to focus on declarative data transformations and data quality rather than schema-level relationships. DLT supports expectations, which are used to manage data quality, but these are not the same as enforcing foreign key constraints. The primary and foreign key constraints in Databricks are informational only and are not enforced. They are available in Unity Catalog and Delta Lake but do not apply within the context of DLT pipelines for enforcing schema-level relationships.
If you need to manage relationships between tables within the same pipeline, you would need to handle this logic manually within your data transformation code. However, there are some approaches to handle this scenario, including your proposed solution.
You can implement a dynamic approach like your example but with some additional steps/modifications worth trying. These are only overall suggestions, please consider what's relevant to your use case and feel free to make any changes as required:
Initially, you may consider adding constraints post pipeline execution. Run a separate step after the DLT pipeline completes to add foreign key constraints to the tables using ALTER TABLE. Example:
spark.sql("""
ALTER TABLE mul_dev_tests.dlt_sap_test_new_new.sales_order
ADD CONSTRAINT fk_dlt_teste FOREIGN KEY (SALESORDER) REFERENCES mul_dev_tests.dlt_sap_test_new_new.sales_order (SALESORDER)
NOT ENFORCED
""")
Additionally, you could also use a helper function to construct the schema dynamically and avoid unnecessary queries by ensuring this logic runs only during development or debugging(?).
def add_fk(constraint_name, foreign_key, full_table_path):
try:
spark.sql(f"DESCRIBE TABLE {full_table_path}")
return f", CONSTRAINT {constraint_name} FOREIGN KEY({foreign_key}) REFERENCES {full_table_path}({foreign_key}) NOT ENFORCED"
except:
return ""
@dlt.table(
schema=f"""
SALESORDER STRING NOT NULL PRIMARY KEY,
SALESORDERTYPE STRING,
SALESORDERPROCESSINGTYPE STRING,
CREATEDBYUSER STRING,
LASTCHANGEDBYUSER STRING,
load_ts TIMESTAMP NOT NULL
{add_fk("fk_dlt_teste", "SALESORDER", "mul_dev_tests.dlt_sap_test_new_new.sales_order")}
"""
)
def sales_order_2():
return (
spark.readStream.table("adm_sap_qas_bronze_dev.sd.sales_order")
.select("SALESORDER", "SALESORDERTYPE", "SALESORDERPROCESSINGTYPE", "CREATEDBYUSER", "LASTCHANGEDBYUSER")
.withColumn("load_ts", F.current_timestamp())
)
Then, third, is to ensure proper execution order, define one table as dependent on another by explicitly referencing the dependent table in your transformation logic. This will ensure the tables are created in sequence rather than concurrently.
@Dlt.table
def dependent_table():
ref_table = dlt.read("referenced_table")
return ref_table.filter("some_condition")
So, as general advice:
Foreign Key Usage: Use foreign key constraints only as metadata for documentation and downstream tool compatibility, as they are not enforced.
Schema Management: Maintain schema constraints in a separate script or deployment step to avoid coupling schema management with data transformation logic.
Pipeline Isolation: If schema management complexity grows, consider separating schema creation and data ingestion into different workflows.
Hope it helps!