Since delta live tables handle parallelism for you, I would use a metadata table that defines some variables, read those into a dict, and iterate over the dict in a delta live table like so:
"""
Assumptions:
1: Dict named metadata_dict with database_name, schema_name, table_name, file_path, data_format, load_type
2: Data ingestion from database -> cloud storage is a separate process either overwrite destination or append only destination
3: You already have a session scoped connection to cloud storage or are using mount points
"""
def create_delta_table(database_name:str, schema_name:str, table_name:str, file_path:str, data_format:str, load_type:str) -> DataFrame:
""" Take table metadata including file path, schema, table name and utilize autoloader to either drop/reload or append to destination delta table.
Note - Delta Live Tables do not currently support Unity catalog three level namespace, hive_metastore catalog will be used.
Args:
database_name(str): A string value of the source database name
schema_name(str): A string value of the source schema name
table_name(str): A string value of the source table name
file_path(str): A string value defining where in cloud storage the table data is located (s3, gcp, adls)
data_format(str): A string value of the cloud storage data format (json, parquet, etc)
load_type(str): A string value accepting ['reload', 'append']
Returns:
A spark dataframe named database_name__schema_name__table_name loaded to target schema defined in DLT pipeline in delta format
"""
accepted_load_types = ["reload", "append"]
if load_type not in accepted_load_types:
raise ValueError(f"Load type {load_type} is not in accepted load types ['reload','append']"
destination_table_name = database_name + "__" + schema_name + "__" + table_name
if load_type == "reload":
@dlt.table(name=destination_table_name)
def create_or_reload_delta_table():
df = (
spark
.read
.format(f"{data_format}")
.load(file_path)
)
#Do additional transformations here
#Return final df
return df
if load_type == "append":
@dlt.table(name=destination_table_name)
def create_or_append_delta_table():
df = (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", f"{data_format}")
#Add additional options like inferColumnTypes or schemaEvolutionMode here
.load(file_path)
)
#Do additional transformations here
#Return final df
return df
if __name__ == "__main__":
for table in metadata_dict:
database_name = table["database_name"]
schema_name = table["schema_name"]
table_name = table["table_name"]
file_path = table["file_path"]
data_format = table["data_format"]
load_type = table["load_type"]
create_delta_table(database_name=database_name, schema_name=schema_name, table_name=table_name, file_path=file_path, data_format=data_format, load_type=load_type)