<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: implement autoloader to ingest data into delta lake, i have 100 different tables with full load , append merge senarios in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-lake-i-have-100/m-p/13659#M8293</link>
    <description>&lt;P&gt;You can create a generic notebook that will be parametrized with the table name/source system and then just simply trigger notebook with different parameters (for each table/source system).&lt;/P&gt;&lt;P&gt;For parametrization you can use dbutils.widgets (https://docs.databricks.com/notebooks/widgets.html).&lt;/P&gt;&lt;P&gt;For orchestration you can either use Databricks Workloads or any other external orchestrator (ex. Azure Data Factory, Apache Airflow).&lt;/P&gt;</description>
    <pubDate>Thu, 05 Jan 2023 07:42:44 GMT</pubDate>
    <dc:creator>daniel_sahal</dc:creator>
    <dc:date>2023-01-05T07:42:44Z</dc:date>
    <item>
      <title>implement autoloader to ingest data into delta lake, i have 100 different tables with full load , append merge senarios</title>
      <link>https://community.databricks.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-lake-i-have-100/m-p/13657#M8291</link>
      <description>&lt;P&gt;i want to implement autoloader to ingest data into delta lake from 5 different source systems and i have 100 different tables in each database how do we dynamically address this by using autoloader , trigger once option - full load , append merge senario&lt;/P&gt;</description>
      <pubDate>Wed, 04 Jan 2023 15:45:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-lake-i-have-100/m-p/13657#M8291</guid>
      <dc:creator>AK032716</dc:creator>
      <dc:date>2023-01-04T15:45:09Z</dc:date>
    </item>
    <item>
      <title>Re: implement autoloader to ingest data into delta lake, i have 100 different tables with full load , append merge senarios</title>
      <link>https://community.databricks.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-lake-i-have-100/m-p/13658#M8292</link>
      <description>&lt;P&gt;Since delta live tables handle parallelism for you, I would use a metadata table that defines some variables, read those into a dict, and iterate over the dict in a delta live table like so:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;"""
Assumptions:
1: Dict named metadata_dict with database_name, schema_name, table_name, file_path, data_format, load_type
2: Data ingestion from database -&amp;gt; cloud storage is a separate process either overwrite destination or append only destination
3: You already have a session scoped connection to cloud storage or are using mount points
"""
&amp;nbsp;
def create_delta_table(database_name:str, schema_name:str, table_name:str, file_path:str, data_format:str, load_type:str) -&amp;gt; DataFrame:
&amp;nbsp;
""" Take table metadata including file path, schema, table name and utilize autoloader to either drop/reload or append to destination delta table.
Note - Delta Live Tables do not currently support Unity catalog three level namespace, hive_metastore catalog will be used.
&amp;nbsp;
Args:
	database_name(str): A string value of the source database name
	schema_name(str): A string value of the source schema name
	table_name(str): A string value of the source table name
	file_path(str): A string value defining where in cloud storage the table data is located (s3, gcp, adls)
	data_format(str): A string value of the cloud storage data format (json, parquet, etc)
	load_type(str): A string value accepting ['reload', 'append']
Returns:
	A spark dataframe named database_name__schema_name__table_name loaded to target schema defined in DLT pipeline in delta format
"""
&amp;nbsp;
	accepted_load_types = ["reload", "append"]
	if load_type not in accepted_load_types:
		raise ValueError(f"Load type {load_type} is not in accepted load types ['reload','append']"
		
	destination_table_name = database_name + "__" + schema_name + "__" + table_name
	
	if load_type == "reload":
		
		@dlt.table(name=destination_table_name)
		def create_or_reload_delta_table():
			
			df = (
				spark
				.read
				.format(f"{data_format}")
				.load(file_path)
			)
			
			#Do additional transformations here
			
			#Return final df
			return df
		
	if load_type == "append":
	
		@dlt.table(name=destination_table_name)
		def create_or_append_delta_table():
			
			df = (
				spark
				.readStream
				.format("cloudFiles")
				.option("cloudFiles.format", f"{data_format}")
				#Add additional options like inferColumnTypes or schemaEvolutionMode here
				.load(file_path)
			)
			
			#Do additional transformations here
			
			#Return final df
			return df
	
if __name__ == "__main__":
&amp;nbsp;
	for table in metadata_dict:
		
		database_name = table["database_name"]
		schema_name = table["schema_name"]
		table_name = table["table_name"]
		file_path = table["file_path"]
		data_format = table["data_format"]
		load_type = table["load_type"]
		
		create_delta_table(database_name=database_name, schema_name=schema_name, table_name=table_name, file_path=file_path, data_format=data_format, load_type=load_type)
		&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 04 Jan 2023 21:36:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-lake-i-have-100/m-p/13658#M8292</guid>
      <dc:creator>Jfoxyyc</dc:creator>
      <dc:date>2023-01-04T21:36:11Z</dc:date>
    </item>
    <item>
      <title>Re: implement autoloader to ingest data into delta lake, i have 100 different tables with full load , append merge senarios</title>
      <link>https://community.databricks.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-lake-i-have-100/m-p/13659#M8293</link>
      <description>&lt;P&gt;You can create a generic notebook that will be parametrized with the table name/source system and then just simply trigger notebook with different parameters (for each table/source system).&lt;/P&gt;&lt;P&gt;For parametrization you can use dbutils.widgets (https://docs.databricks.com/notebooks/widgets.html).&lt;/P&gt;&lt;P&gt;For orchestration you can either use Databricks Workloads or any other external orchestrator (ex. Azure Data Factory, Apache Airflow).&lt;/P&gt;</description>
      <pubDate>Thu, 05 Jan 2023 07:42:44 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-lake-i-have-100/m-p/13659#M8293</guid>
      <dc:creator>daniel_sahal</dc:creator>
      <dc:date>2023-01-05T07:42:44Z</dc:date>
    </item>
  </channel>
</rss>

