cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

implement autoloader to ingest data into delta lake, i have 100 different tables with full load , append merge senarios

AK032716
New Contributor

i want to implement autoloader to ingest data into delta lake from 5 different source systems and i have 100 different tables in each database how do we dynamically address this by using autoloader , trigger once option - full load , append merge senario

3 REPLIES 3

Jfoxyyc
Valued Contributor

Since delta live tables handle parallelism for you, I would use a metadata table that defines some variables, read those into a dict, and iterate over the dict in a delta live table like so:

"""
Assumptions:
1: Dict named metadata_dict with database_name, schema_name, table_name, file_path, data_format, load_type
2: Data ingestion from database -> cloud storage is a separate process either overwrite destination or append only destination
3: You already have a session scoped connection to cloud storage or are using mount points
"""
 
def create_delta_table(database_name:str, schema_name:str, table_name:str, file_path:str, data_format:str, load_type:str) -> DataFrame:
 
""" Take table metadata including file path, schema, table name and utilize autoloader to either drop/reload or append to destination delta table.
Note - Delta Live Tables do not currently support Unity catalog three level namespace, hive_metastore catalog will be used.
 
Args:
	database_name(str): A string value of the source database name
	schema_name(str): A string value of the source schema name
	table_name(str): A string value of the source table name
	file_path(str): A string value defining where in cloud storage the table data is located (s3, gcp, adls)
	data_format(str): A string value of the cloud storage data format (json, parquet, etc)
	load_type(str): A string value accepting ['reload', 'append']
Returns:
	A spark dataframe named database_name__schema_name__table_name loaded to target schema defined in DLT pipeline in delta format
"""
 
	accepted_load_types = ["reload", "append"]
	if load_type not in accepted_load_types:
		raise ValueError(f"Load type {load_type} is not in accepted load types ['reload','append']"
		
	destination_table_name = database_name + "__" + schema_name + "__" + table_name
	
	if load_type == "reload":
		
		@dlt.table(name=destination_table_name)
		def create_or_reload_delta_table():
			
			df = (
				spark
				.read
				.format(f"{data_format}")
				.load(file_path)
			)
			
			#Do additional transformations here
			
			#Return final df
			return df
		
	if load_type == "append":
	
		@dlt.table(name=destination_table_name)
		def create_or_append_delta_table():
			
			df = (
				spark
				.readStream
				.format("cloudFiles")
				.option("cloudFiles.format", f"{data_format}")
				#Add additional options like inferColumnTypes or schemaEvolutionMode here
				.load(file_path)
			)
			
			#Do additional transformations here
			
			#Return final df
			return df
	
if __name__ == "__main__":
 
	for table in metadata_dict:
		
		database_name = table["database_name"]
		schema_name = table["schema_name"]
		table_name = table["table_name"]
		file_path = table["file_path"]
		data_format = table["data_format"]
		load_type = table["load_type"]
		
		create_delta_table(database_name=database_name, schema_name=schema_name, table_name=table_name, file_path=file_path, data_format=data_format, load_type=load_type)
		

daniel_sahal
Esteemed Contributor

You can create a generic notebook that will be parametrized with the table name/source system and then just simply trigger notebook with different parameters (for each table/source system).

For parametrization you can use dbutils.widgets (https://docs.databricks.com/notebooks/widgets.html).

For orchestration you can either use Databricks Workloads or any other external orchestrator (ex. Azure Data Factory, Apache Airflow).

Kaniz
Community Manager
Community Manager

Hi @Anil Kovilakar​(Customer)​ , We haven’t heard from you on the last response from @Daniel Sahal​ ​ and @Jordan Fox​, and I was checking back to see if their suggestions helped you.

Or else, If you have any solution, please do share that with the community as it can be helpful to others.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.