Authors: Benita Owoghiri and Ofer Ohana
Introduction
Migrating data from enterprise data warehouses like Amazon Redshift, Google BigQuery, and on-premise data warehouses is a common challenge for many organizations. These external data sources often contain multiple datasets requiring repeated ingestion tasks. Traditionally, this is handled with repetitive code and pipelines, increasing development and maintenance efforts.
Moreover, managing the entire process in a single notebook pipeline can make it difficult to monitor individual tasks, leading to challenges in identifying errors or tracking the progress of specific table migrations. Databricks Workflows and the "For Each" loop offer an effective solution to this problem. By dynamically iterating over datasets, you can automate repeated tasks while improving visibility and monitoring for individual migrations.
In this post, weāll explore how to use the "For Each" loop in Databricks Workflows to streamline the migration of datasets from federated sources and address common migration challenges.
The diagram illustrates a āFor Eachā task that has a notebook task configured and references a list of three values provided by job parameters. We will instantiate a notebook for each set of parameters provided by the user.
Example Databricks āFor Eachā Workflow
Challenge
Imagine you are part of the data engineering team at a global e-commerce company that processes sales data from multiple datasets. You're tasked with migrating this data from an external Amazon Redshift database into Databricks. The goal is to streamline the workflow to read and write these tables into the Delta Lakehouse.
Solution
To simplify the ingestion process, youāll need to automate the following tasks:
- Creating Table Mappings:
Ingest raw data from various tables by reading the table list from Redshift. Create a mapping between Redshift and Databricks based on naming conventions, then dynamically save these mappings as the table_namespaces task parameter.
- For Each Task:
Using a āfor eachā task gives you the ability to execute a processing task sequentially or in parallel according to the table mapping. To learn more about āfor eachā task, check out the documentation.
Putting it all together:
Your Databricks Workflow will utilise the "For Each" loop to automate these operations for all tables, ensuring efficient and repeatable ingestion into the Delta Lakehouse. Your workflow will end up looking something like this:
Prerequisites
- Enable Unity Catalog on the workspace.
- Create a connection to your external source using Lakehouse Federation. To learn how to do this, check out Lakehouse Federation Documentation
- Create a target catalog for the data target location. See Create Catalog
Step 1: Create a Databricks Job
- Navigate to Databricks Workflows by clicking "Workflows" in the left sidebar.
- Click the "Create Job" button in the upper right corner of the window.
- Name your job in the top left corner, e.g., "migrations_pipeline"
Step 2 - Initial Task of Reading Redshift Table List Dynamically
A Databricks job consists of one or more tasks, each representing a specific unit of work. This allows you to chain multiple tasks together and set dependencies. In our example, we will set up the first task to extract the source tables from the information schema on Amazon Redshift and then pass the list of tables to subsequent tasks in the job using task values.
- Create a notebook
- Name the task āextract_tablesā
- In the Type dropdown menu, select Notebook.
- Select the location for where the Notebook code resides in the Source field
- In the compute, choose a compute cluster created or choose Serverless
- Write the function get_table_names that queries Redshift to retrieve a list of tables from Redshift's information_schema
Most enterprise data warehouses store metadata about tables, schemas, and catalogs in a specific schema. Since this example uses Amazon Redshift, our notebook will read from Redshiftās information_schema to gather the necessary metadata for the migration.
In your notebook, add the following code:
#Function to extract table names from Amazon Redshift using the information schema
def get_table_names(schema_name, table_name_pattern):
query = f"(SELECT table_name FROM information_schema.tables WHERE table_schema = '{schema_name}' AND table_name LIKE '{table_name_pattern}') AS table_list"
df_tables = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("query", query) \
.option("user", redshift_user) \
.option("password", redshift_password) \
.load()
return df_tables
|
- Add the function generate_unity_catalog_target that takes the table information from Redshift and creates corresponding target table paths in Unity Catalog.
# Function to generate Unity Catalog target namespaces
def generate_unity_catalog_target(catalog_name, df_tables):
target_paths = []
# Iterate over the tables in the DataFrame
for row in df_tables.collect():
table_name = row['table_name'] # Extract table name from the row
schema_name = row['table_schema'] # Extract schema name from the row
# Generate the target path
target_path = f"{catalog_name}.{schema_name}.{table_name}"
target_paths.append(target_path)
print(f"Target path: {target_path}")
return target_paths
|
- Create the function generate_namespace_mapping. This function will create a dictionary to map the table in Redshift to the table in Databricks.
# Function to generate a JSON dictionary for source and target namespaces
def generate_namespace_mapping(source_catalog, source_schema, target_uc_catalog, df_tables):
table_list = []
# Iterate over the tables in the DataFrame
for row in df_tables.collect():
table_name = row['table_name']
schema_name = row['table_schema']
# Define the source and target namespaces
source_namespace = f"{source_catalog}.{source_schema}.{table_name}"
target_namespace = f"{target_uc_catalog}.{schema_name}.{table_name}"
# Append to the table list
table_list.append({
"source_namespace": source_namespace,
"target_namespace": target_namespace
})
# Convert the list to a JSON string (if needed)
table_list_json = json.dumps(table_list, indent=4)
print(f"Namespace mappings: {table_list_json}")
return table_list
|
- Pass the list of table mapping to the next Workflow task using task values.
- Use dbutils.jobs.taskValues.set(key, value) to set the task value.
- The key is the name that downstream tasks will use to look up this task value. This name must be unique to the task.
- The value is the information youād like to pass to downstream tasks. This command must be able to represent the value internally in JSON format, and its JSON representation cannot exceed 48 KiB.
external_location = "redshiftsourcectlg.information_schema.tables"
source_redshift_catalog = "dev"
source_redshift_schema = "public"
target_uc_catalog = "production_sales_data"
# Get source table names
df_tables = get_table_names(external_location, source_redshift_catalog, source_redshift_schema)
# Generate the namespace mapping
namespace_mappings = generate_namespace_mapping(source_redshift_catalog, source_redshift_schema, target_uc_catalog, df_tables)
#Set task values to be passed to the next for each task
dbutils.jobs.taskValues.set(key = "table_namespaces", value = namespace_mappings)
|
Step 4 - Configure the "For Each" Task
The āFor Eachā task will run a notebook for each value in its āinputsā parameter. To set it up, follow these steps:
- Click the Add Task button in the center of the workflow graph.
- In the Task name field, enter a name like ingest_data_notebook.
- From the Type dropdown menu, select For Each.
- In the Inputs text box, define the values for the task to iterate on. Use the syntax {{tasks.<task_name>.values.<task_value_name>}} to reference the task values from the earlier task.
{{tasks.extract_tables.values.table_namespaces}}
|
- Concurrency - If you want to run several iterations of the notebook concurrently, then you can input a higher concurrency number than the default of 1.
- Click Add a task to loop over on the bottom right in order to add the task we will be looping over.
Step 5 - Configure the Nested Task - Notebook Example
Nested Tasks have many of the same configuration options as standard tasks; however, only one task can be defined under for each. For this example, we use a notebook but you can also use SQL, dbt, Python scripts, or even another pipeline.
- In the Type dropdown, select Notebook and choose the appropriate notebook from your workspace.
- In our notebook, we can reference the parameters passed into the task using the Databricks widgets utility.
# Get the source and target table names from job parameters
source_table = dbutils.widgets.get("source_namespace")
target_table = dbutils.widgets.get("target_namespace")
|
- Now, we will read from Redshift and write to the table in Databricks. Weāre using federated queries to read from Redshift tables, which can be read like any other table in Databricks.
# Ensure the target table exists, if not create an empty one with the same schema
if not spark.catalog.tableExists(target_table):
# Create an empty target table with the same schema as the source table by using the condition where 1 = 0
source_df = spark.table(source_table)
spark.sql(f"CREATE TABLE {target_table} AS SELECT * FROM {source_table} WHERE 1=0")
# Perform the incremental insert using SQL
spark.sql(f"""
INSERT INTO {target_table}
SELECT * FROM {source_table} src
WHERE src.created_at > NVL((SELECT MAX(tgt.created_at) FROM {target_table} tgt), '1900-01-01')
""")
|
- In the task settings, to reference the inputs in the for-each task, we use the {{input}} reference to set the value to the array value of each iteration or {{input.<key>}} to reference individual object fields when you iterate over a list of objects.
For this example we use {{input.source_namespace}} and {{input.target_namespace}}
- āOn the top right hand, choose Run now, to execute the pipeline
Step 6 - Monitoring
- On the top left of the job screen, navigate to the Runs tab to view a graph and table of each pipeline run. Here, you can track the total duration of the pipeline, the status, and any error codes.
- Click on your most recent run to visualise the progress of each task in the pipeline and their overall status in a graph.
- Click on the āfor eachā task specifically. Here you will have clear visibility into each iteration's performance. Users can easily track start and end times, durations, and statuses gaining quick insights into how long each loop took to execute. As you can see for 2 out of 3 of the tables, it took twice as long to migrate the data. This visibility is one of the main benefits of the Databricks' "For Each" task.
- You can also view the input parameters in the table. In this example, the source_namespace and target_namespace are included in the table.
- For every run, you can also drill into the execution and view the run and associated parameters passed.
Benefits of āFor Eachā
- Parallel Execution: It allows for parallel execution of tasks which is useful for running a job across multiple datasets, partitions, or tenants in parallel without needing to create separate jobs or tasks for each instance. This improves efficiency and reduces total execution time compared to sequential execution.
- Dynamic Task Creation: Instead of defining individual tasks manually, forEach dynamically generates tasks based on the input list or collection. This reduces the effort of predefining numerous jobs. There's less need for complex branching logic or multiple pipelines. You define the logic once, and it is applied to each element. Without forEach, youād need to manually create multiple tasks for each instance, leading to duplicated logic and more maintenance overhead.
- Improved Monitoring and Observability: For Each in Databricks, Workflows provides granular monitoring of each parallel task individually. This means you can track each task's progress, logs, and status within the loop independently, all from a single job dashboard. Without For Each, manually created tasks or jobs might require monitoring each separately, adding complexity to understanding the state of the entire workflow.
Conclusion
In this post, we discussed how customers with many tables can migrate data using Lakehouse Federation and the "For Each" loop in Databricks Workflows. The "For Each" loop in Databricks Workflows simplifies handling repetitive, parameter-driven tasks, which boosts efficiency and scalability.
We used a simplified example to illustrate how this can be applied to real-world scenarios using Databricks best practices and discussed the benefits of using this method compared to other common solutions.