cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

implement autoloader to ingest data into delta lake, i have 100 different tables with full load , append merge senarios

AK032716
New Contributor

i want to implement autoloader to ingest data into delta lake from 5 different source systems and i have 100 different tables in each database how do we dynamically address this by using autoloader , trigger once option - full load , append merge senario

3 REPLIES 3

Jfoxyyc
Valued Contributor

Since delta live tables handle parallelism for you, I would use a metadata table that defines some variables, read those into a dict, and iterate over the dict in a delta live table like so:

"""
Assumptions:
1: Dict named metadata_dict with database_name, schema_name, table_name, file_path, data_format, load_type
2: Data ingestion from database -> cloud storage is a separate process either overwrite destination or append only destination
3: You already have a session scoped connection to cloud storage or are using mount points
"""
 
def create_delta_table(database_name:str, schema_name:str, table_name:str, file_path:str, data_format:str, load_type:str) -> DataFrame:
 
""" Take table metadata including file path, schema, table name and utilize autoloader to either drop/reload or append to destination delta table.
Note - Delta Live Tables do not currently support Unity catalog three level namespace, hive_metastore catalog will be used.
 
Args:
	database_name(str): A string value of the source database name
	schema_name(str): A string value of the source schema name
	table_name(str): A string value of the source table name
	file_path(str): A string value defining where in cloud storage the table data is located (s3, gcp, adls)
	data_format(str): A string value of the cloud storage data format (json, parquet, etc)
	load_type(str): A string value accepting ['reload', 'append']
Returns:
	A spark dataframe named database_name__schema_name__table_name loaded to target schema defined in DLT pipeline in delta format
"""
 
	accepted_load_types = ["reload", "append"]
	if load_type not in accepted_load_types:
		raise ValueError(f"Load type {load_type} is not in accepted load types ['reload','append']"
		
	destination_table_name = database_name + "__" + schema_name + "__" + table_name
	
	if load_type == "reload":
		
		@dlt.table(name=destination_table_name)
		def create_or_reload_delta_table():
			
			df = (
				spark
				.read
				.format(f"{data_format}")
				.load(file_path)
			)
			
			#Do additional transformations here
			
			#Return final df
			return df
		
	if load_type == "append":
	
		@dlt.table(name=destination_table_name)
		def create_or_append_delta_table():
			
			df = (
				spark
				.readStream
				.format("cloudFiles")
				.option("cloudFiles.format", f"{data_format}")
				#Add additional options like inferColumnTypes or schemaEvolutionMode here
				.load(file_path)
			)
			
			#Do additional transformations here
			
			#Return final df
			return df
	
if __name__ == "__main__":
 
	for table in metadata_dict:
		
		database_name = table["database_name"]
		schema_name = table["schema_name"]
		table_name = table["table_name"]
		file_path = table["file_path"]
		data_format = table["data_format"]
		load_type = table["load_type"]
		
		create_delta_table(database_name=database_name, schema_name=schema_name, table_name=table_name, file_path=file_path, data_format=data_format, load_type=load_type)
		

daniel_sahal
Esteemed Contributor

You can create a generic notebook that will be parametrized with the table name/source system and then just simply trigger notebook with different parameters (for each table/source system).

For parametrization you can use dbutils.widgets (https://docs.databricks.com/notebooks/widgets.html).

For orchestration you can either use Databricks Workloads or any other external orchestrator (ex. Azure Data Factory, Apache Airflow).

Kaniz_Fatma
Community Manager
Community Manager

Hi @Anil Kovilakarโ€‹(Customer)โ€‹ , We havenโ€™t heard from you on the last response from @Daniel Sahalโ€‹ โ€‹ and @Jordan Foxโ€‹, and I was checking back to see if their suggestions helped you.

Or else, If you have any solution, please do share that with the community as it can be helpful to others.

Also, Please don't forget to click on the "Select As Best" button whenever the information provided helps resolve your question.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group