a week ago
Dear Team!
I want to prevent at a certain table that the schema is automatically updated. With plain strucutred streaming I can do the following:
silver_df.writeStream \
.format("delta") \
.option("mergeSchema", "false") \
.option("checkpointLocation", checkpoint_path) \
.outputMode("append") \
.table("silver_table")How can I set mergeSchema=false with Declarative Pipelines?
Wednesday
It is automatic in DLT. If there are significant schema changes, you need to full refresh. Maybe consider storing everything (the whole JSON) in a single VARIANT column and unpacking only what is necessary later - this way you will have it under control.
a week ago
Hi @a_user12 ,
Did you try something like in a link below? Of course in your case you want to set it to "false":
H Learn Data Engineering: Databricks Delta Live Table | by THE BRICK LEARNING | Medium
a week ago
@szymon_dybczak - thank you for your response
I try:
@dlt.table(
name="deserialized",
comment="Raw messages from Kafka topic as JSON",
table_properties={
"pipelines.autoOptimize.managed": "true",
"pipelines.autoCompact.managed": "true"
}
)
def deserialize():
# Read from Kafka
return spark.readStream \
.table("stringified") \
.withColumn("payload", from_json(col("payload"),None,{"schemaLocationKey": "x"})) \
.select("topic","timestamp","payload") \
.withColumn("new-x",lit("foo"))
@dlt.table(
name="enriched_table",
table_properties={
"pipelines.autoOptimize.managed": "true",
"pipelines.autoCompact.managed": "true"
}
)
def enriched_table():
return spark.readStream.option("mergeSchema","false").table("deserialized")
#.withColumn("new",lit("new")) # ensure columns match exactly I would expect, that if the attribute "nex-x" is not existing in the table "enriched table" yet I get an error. Indeed, it is simply adding the new column in the "neriched table".
a week ago
Hi @a_user12 DLT is designed to automatically evolve the schema of tables as your pipeline logic changes. If your code returns a DataFrame with new columns, DLT will add those columns to the table automatically. There is no built-in option to prevent this or to enforce a fixed schema.
1) you can try enforcing explicit schema definitions in your pipeline code??
also try , Disablng schema auto merge globally
a week ago
BETTER MODIFIED RESPONSE
Hi @a_user12 DLT is designed to automatically evolve the schema of tables as your pipeline logic changes. If your code returns a DataFrame with new columns, DLT will add those columns to the table automatically. There is no built-in option to prevent this or to enforce a fixed schema. however in your case you are adding option during readstream. mergeSchema on readStream has no effect on whether downstream tables evolve. Schema evolution happens on write (or when DLT materializes a table)
1) you can try enforcing explicit schema definitions in your pipeline code?? OPTIONAL
also try , Disablng schema auto merge globally (if adding to your write is not working)
Monday
I am aware of from_json but I want to prevent schema changes on the delta table itself
Wednesday
Hi @a_user12 can you try with following :
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.minReaderVersion' = '2', 'delta.minWriterVersion' = '5', 'delta.columnMapping.mode' = 'name', 'delta.schema.autoMerge.enabled' = 'false');
Wednesday
It is automatic in DLT. If there are significant schema changes, you need to full refresh. Maybe consider storing everything (the whole JSON) in a single VARIANT column and unpacking only what is necessary later - this way you will have it under control.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now