cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Declarative Pipelines: set Merge Schema to False

a_user12
New Contributor III

Dear Team!

I want to prevent at a certain table that the schema is automatically updated. With plain strucutred streaming I can do the following:

silver_df.writeStream \
    .format("delta") \
    .option("mergeSchema", "false") \
    .option("checkpointLocation", checkpoint_path) \
    .outputMode("append") \
    .table("silver_table")

How can I set mergeSchema=false with Declarative Pipelines?

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

It is automatic in DLT. If there are significant schema changes, you need to full refresh. Maybe consider storing everything (the whole JSON) in a single VARIANT column and unpacking only what is necessary later - this way you will have it under control.

View solution in original post

7 REPLIES 7

szymon_dybczak
Esteemed Contributor III

Hi @a_user12 ,

Did you try something like in a link below? Of course in your case you want to set it to "false":

H Learn Data Engineering: Databricks Delta Live Table | by THE BRICK LEARNING | Medium

szymon_dybczak_0-1764517519244.png

 

a_user12
New Contributor III

@szymon_dybczak  - thank you for your response 

I try:

 

 
@dlt.table(
    name="deserialized",
    comment="Raw messages from Kafka topic as JSON",
    table_properties={
        "pipelines.autoOptimize.managed": "true",
        "pipelines.autoCompact.managed": "true"
    }
)
def deserialize():
    # Read from Kafka
    return spark.readStream \
            .table("stringified") \
            .withColumn("payload", from_json(col("payload"),None,{"schemaLocationKey": "x"})) \
            .select("topic","timestamp","payload") \
            .withColumn("new-x",lit("foo"))
    



@dlt.table(
    name="enriched_table",
    table_properties={
        "pipelines.autoOptimize.managed": "true",
        "pipelines.autoCompact.managed": "true"
    }
)
def enriched_table():
    return spark.readStream.option("mergeSchema","false").table("deserialized")
        #.withColumn("new",lit("new"))  # ensure columns match exactly    

I would expect, that if the attribute "nex-x" is not existing in the table "enriched table" yet I get an error. Indeed, it is simply adding the new column in the "neriched table".

 

saurabh18cs
Honored Contributor II

Hi  @a_user12  DLT is designed to automatically evolve the schema of tables as your pipeline logic changes. If your code returns a DataFrame with new columns, DLT will add those columns to the table automatically. There is no built-in option to prevent this or to enforce a fixed schema.
1) you can try enforcing explicit schema definitions in your pipeline code??

also try , Disablng schema auto merge globally

2) spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "false")
 
example :
1) 

# Define schema for payload
payload_schema = StructType([
    StructField("field1", StringType(), True),
    StructField("field2", IntegerType(), True)
])
 
.withColumn("payload", from_json(col("payload"), payload_schema))

@a_user12

saurabh18cs
Honored Contributor II

BETTER MODIFIED RESPONSE

Hi  @a_user12  DLT is designed to automatically evolve the schema of tables as your pipeline logic changes. If your code returns a DataFrame with new columns, DLT will add those columns to the table automatically. There is no built-in option to prevent this or to enforce a fixed schema. however in your case you are adding option during readstream. mergeSchema on readStream has no effect on whether downstream tables evolve. Schema evolution happens on write (or when DLT materializes a table)


1) you can try enforcing explicit schema definitions in your pipeline code?? OPTIONAL

also try , Disablng schema auto merge globally (if adding to your write is not working)

2) spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "false")
 
example :
1) 

# Define schema for payload
payload_schema = StructType([
    StructField("field1", StringType(), True),
    StructField("field2", IntegerType(), True)
])
 
.withColumn("payload", from_json(col("payload"), payload_schema))

@a_user12

a_user12
New Contributor III

I am aware of from_json but I want to prevent schema changes on the delta table itself

saurabh18cs
Honored Contributor II

Hi @a_user12 can you try with following :

ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.minReaderVersion' = '2', 'delta.minWriterVersion' = '5', 'delta.columnMapping.mode' = 'name', 'delta.schema.autoMerge.enabled' = 'false');

Hubert-Dudek
Esteemed Contributor III

It is automatic in DLT. If there are significant schema changes, you need to full refresh. Maybe consider storing everything (the whole JSON) in a single VARIANT column and unpacking only what is necessary later - this way you will have it under control.