Edit: Adding code for clarity.
# Top-level struct [OldRecord, NewRecord]
schema = StructType([
StructField("NewRecord", StructType([...
]),
"OldRecord", StructType([....
])
)
# streaming query
@dlt.table(
name="newrecord_raw",
table_properties={"quality": "bronze"},
temporary=False,
)
def create_table():
query = (
spark.readStream.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "json")
.option("checkpointLocation", "/Volumes/dev/streaming/")
.load(sink_dir)
.select("NewRecord.*")
.withColumn("load_dt", to_timestamp(current_timestamp()))
)
return query