ggsmith
Contributor

Edit: Adding code for clarity.

# Top-level struct [OldRecord, NewRecord]
schema = StructType([
    StructField("NewRecord", StructType([...
          ]),
          "OldRecord", StructType([....
          ])
    )

# streaming query
@dlt.table(
    name="newrecord_raw",
    table_properties={"quality": "bronze"},
    temporary=False,
)
def create_table():
    query = (
        spark.readStream.format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "json")
        .option("checkpointLocation", "/Volumes/dev/streaming/")
        .load(sink_dir)
        .select("NewRecord.*")
        .withColumn("load_dt", to_timestamp(current_timestamp()))
    )
    return query