implement autoloader to ingest data into delta lake, i have 100 different tables with full load , append merge senarios
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-04-2023 07:45 AM
i want to implement autoloader to ingest data into delta lake from 5 different source systems and i have 100 different tables in each database how do we dynamically address this by using autoloader , trigger once option - full load , append merge senario
- Labels:
-
Autoloader
-
Different Tables
-
Dynamic
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-04-2023 01:36 PM
Since delta live tables handle parallelism for you, I would use a metadata table that defines some variables, read those into a dict, and iterate over the dict in a delta live table like so:
"""
Assumptions:
1: Dict named metadata_dict with database_name, schema_name, table_name, file_path, data_format, load_type
2: Data ingestion from database -> cloud storage is a separate process either overwrite destination or append only destination
3: You already have a session scoped connection to cloud storage or are using mount points
"""
def create_delta_table(database_name:str, schema_name:str, table_name:str, file_path:str, data_format:str, load_type:str) -> DataFrame:
""" Take table metadata including file path, schema, table name and utilize autoloader to either drop/reload or append to destination delta table.
Note - Delta Live Tables do not currently support Unity catalog three level namespace, hive_metastore catalog will be used.
Args:
database_name(str): A string value of the source database name
schema_name(str): A string value of the source schema name
table_name(str): A string value of the source table name
file_path(str): A string value defining where in cloud storage the table data is located (s3, gcp, adls)
data_format(str): A string value of the cloud storage data format (json, parquet, etc)
load_type(str): A string value accepting ['reload', 'append']
Returns:
A spark dataframe named database_name__schema_name__table_name loaded to target schema defined in DLT pipeline in delta format
"""
accepted_load_types = ["reload", "append"]
if load_type not in accepted_load_types:
raise ValueError(f"Load type {load_type} is not in accepted load types ['reload','append']"
destination_table_name = database_name + "__" + schema_name + "__" + table_name
if load_type == "reload":
@dlt.table(name=destination_table_name)
def create_or_reload_delta_table():
df = (
spark
.read
.format(f"{data_format}")
.load(file_path)
)
#Do additional transformations here
#Return final df
return df
if load_type == "append":
@dlt.table(name=destination_table_name)
def create_or_append_delta_table():
df = (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", f"{data_format}")
#Add additional options like inferColumnTypes or schemaEvolutionMode here
.load(file_path)
)
#Do additional transformations here
#Return final df
return df
if __name__ == "__main__":
for table in metadata_dict:
database_name = table["database_name"]
schema_name = table["schema_name"]
table_name = table["table_name"]
file_path = table["file_path"]
data_format = table["data_format"]
load_type = table["load_type"]
create_delta_table(database_name=database_name, schema_name=schema_name, table_name=table_name, file_path=file_path, data_format=data_format, load_type=load_type)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-04-2023 11:42 PM
You can create a generic notebook that will be parametrized with the table name/source system and then just simply trigger notebook with different parameters (for each table/source system).
For parametrization you can use dbutils.widgets (https://docs.databricks.com/notebooks/widgets.html).
For orchestration you can either use Databricks Workloads or any other external orchestrator (ex. Azure Data Factory, Apache Airflow).

