@szymon_dybczak - thank you for your response
I try:
@dlt.table(
name="deserialized",
comment="Raw messages from Kafka topic as JSON",
table_properties={
"pipelines.autoOptimize.managed": "true",
"pipelines.autoCompact.managed": "true"
}
)
def deserialize():
# Read from Kafka
return spark.readStream \
.table("stringified") \
.withColumn("payload", from_json(col("payload"),None,{"schemaLocationKey": "x"})) \
.select("topic","timestamp","payload") \
.withColumn("new-x",lit("foo"))
@dlt.table(
name="enriched_table",
table_properties={
"pipelines.autoOptimize.managed": "true",
"pipelines.autoCompact.managed": "true"
}
)
def enriched_table():
return spark.readStream.option("mergeSchema","false").table("deserialized")
#.withColumn("new",lit("new")) # ensure columns match exactly
I would expect, that if the attribute "nex-x" is not existing in the table "enriched table" yet I get an error. Indeed, it is simply adding the new column in the "neriched table".