The code follows similar pattern below to load the different tables.
import dlt
import re
import pyspark.sql.functions as F
landing_zone = '/Volumes/bronze_dev/landing_zone/'
source = 'addresses'
@Dlt.table(
comment="addresses snapshot",
name="addresses"
)
def addresses(table_properties={"quality": "bronze"}):
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("header", True)
.option("quoted", True)
.option("quote", "\"")
.load(f"{landing_zone}{source}")
.select(
"*",
F.current_timestamp().alias("processing_time"),
F.col("_metadata.file_name").alias("source_file")
)
)