cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta live tables - foreign keys

ismaelhenzel
Contributor

I'm creating ingestions using delta live tables, the dlt support the use of schema, with constraints like foreign keys. The problem is: how can i create foreign keys between the same pipeline, that has no read/write relation, but has foreign key relation. The dlt graph will run both in the same time, and it will break in constraint telling that the table referenced does not exist. There is a way to ignore constraints in the first run ? avoid error ? or i need to use some alter table after dlt runs, to do this relations ? 
i testes something like the code below, to resolve the problem, but i think that is not a good solution

 

def add_fk(constraint_name, foreign_key, full_table_path😞
    try:
        table_exist = spark.sql(f"DESCRIBE TABLE {full_table_path}")  
        return f",CONSTRAINT {constraint_name} FOREIGN KEY({foreign_key}) REFERENCES {full_table_path}({foreign_key}) NOT ENFORCED"
    except:
        return ""
 
@dlt.table(
    schema=f"""
        SALESORDER STRING NOT NULL PRIMARY KEY,
        SALESORDERTYPE STRING,
        SALESORDERPROCESSINGTYPE STRING,
        CREATEDBYUSER STRING,
        LASTCHANGEDBYUSER STRING,
        load_ts TIMESTAMP NOT NULL
        {add_fk("fk_dlt_teste","SALESORDER", "mul_dev_tests.dlt_sap_test_new_new.sales_order")}
    """
)
def sales_order_2():
    return (
        spark.readStream.table("adm_sap_qas_bronze_dev.sd.sales_order")
  .select("SALESORDER","SALESORDERTYPE","SALESORDERPROCESSINGTYPE","CREATEDBYUSER","LASTCHANGEDBYUSER")
        .withColumn('load_ts', F.current_timestamp())
)
1 REPLY 1

VZLA
Databricks Employee
Databricks Employee

Hello, thank you for your question.

Delta Live Tables (DLT) does not currently have built-in support for creating or enforcing foreign key constraints within a pipeline. This is because DLT is designed to focus on declarative data transformations and data quality rather than schema-level relationships. DLT supports expectations, which are used to manage data quality, but these are not the same as enforcing foreign key constraints. The primary and foreign key constraints in Databricks are informational only and are not enforced. They are available in Unity Catalog and Delta Lake but do not apply within the context of DLT pipelines for enforcing schema-level relationships.

If you need to manage relationships between tables within the same pipeline, you would need to handle this logic manually within your data transformation code. However, there are some approaches to handle this scenario, including your proposed solution.

You can implement a dynamic approach like your example but with some additional steps/modifications worth trying. These are only overall suggestions, please consider what's relevant to your use case and feel free to make any changes as required:

Initially, you may consider adding constraints post pipeline execution. Run a separate step after the DLT pipeline completes to add foreign key constraints to the tables using ALTER TABLE. Example:

spark.sql("""
    ALTER TABLE mul_dev_tests.dlt_sap_test_new_new.sales_order
    ADD CONSTRAINT fk_dlt_teste FOREIGN KEY (SALESORDER) REFERENCES mul_dev_tests.dlt_sap_test_new_new.sales_order (SALESORDER)
    NOT ENFORCED
""")

Additionally, you could also use a helper function to construct the schema dynamically and avoid unnecessary queries by ensuring this logic runs only during development or debugging(?).

def add_fk(constraint_name, foreign_key, full_table_path):
    try:
        spark.sql(f"DESCRIBE TABLE {full_table_path}")
        return f", CONSTRAINT {constraint_name} FOREIGN KEY({foreign_key}) REFERENCES {full_table_path}({foreign_key}) NOT ENFORCED"
    except:
        return ""

@dlt.table(
    schema=f"""
        SALESORDER STRING NOT NULL PRIMARY KEY,
        SALESORDERTYPE STRING,
        SALESORDERPROCESSINGTYPE STRING,
        CREATEDBYUSER STRING,
        LASTCHANGEDBYUSER STRING,
        load_ts TIMESTAMP NOT NULL
        {add_fk("fk_dlt_teste", "SALESORDER", "mul_dev_tests.dlt_sap_test_new_new.sales_order")}
    """
)
def sales_order_2():
    return (
        spark.readStream.table("adm_sap_qas_bronze_dev.sd.sales_order")
        .select("SALESORDER", "SALESORDERTYPE", "SALESORDERPROCESSINGTYPE", "CREATEDBYUSER", "LASTCHANGEDBYUSER")
        .withColumn("load_ts", F.current_timestamp())
    )

Then, third, is to ensure proper execution order, define one table as dependent on another by explicitly referencing the dependent table in your transformation logic. This will ensure the tables are created in sequence rather than concurrently.

@Dlt.table
def dependent_table():
    ref_table = dlt.read("referenced_table")
    return ref_table.filter("some_condition")

So, as general advice:

Foreign Key Usage: Use foreign key constraints only as metadata for documentation and downstream tool compatibility, as they are not enforced.
Schema Management: Maintain schema constraints in a separate script or deployment step to avoid coupling schema management with data transformation logic.
Pipeline Isolation: If schema management complexity grows, consider separating schema creation and data ingestion into different workflows.

Hope it helps!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group