When migrating to Databricks, one of the first steps is loading historical data from on premise or other cloud services into the platform. Getting this data load set up in an automated and efficient way is crucial to executing a tight production cutover. You may have done (or are planning to do) a historical data load for development purposes, but when it's time to go live it really needs to be as fast and automated as possible. Every day after the source database's production snapshot is taken is another data of data catchup required between your team and the go-live.
I recently led a team for a major North American retailer in which Databricks helped migrate the customer's primary North American data warehouse and pipelines from on-prem to Azure Databricks. While there were a number of challenges, the one aspect that enabled our success during the stressful go-live period was how well our historical data load went.
If you are following a standard pattern, you will do historical data load three times:
Let's talk about how make this historical data load as great as possible.
Let's outline what the requirements for a successful historical load are:
Bringing the table data from the source location can be challenging given that the data may sit on-prem. We consider the following three options:
Connection Method | Pros | Cons |
Apache Spark (TM) JDBC/ODBC | No need to use ADF | Not scalable out of the box, requires table information for partitioned read of large tables |
ADF Delta Lake Connector | Works for any size tables |
Unable to customize table loads Data quality issues |
ADF Parquet Connector |
Works for any size tables Able to customize table loads |
Requires extra step afterwards to load data into Delta Lake format |
Given the need in our project to customize data loads for the following reasons:
We chose to go with the ADF Parquet Connector, and then follow up with a Databricks notebook that would load the data from parquet into a pre-defined table schema. Even if you don't have any special needs, I suggest you do the same so that you can make your code more flexible. It can also be nice to separate the step of importing to Parquet to the step to load to Delta, since if there is an issue with a table you can just drop the Delta table and re-load it without having to re-import the underlying data.
To achieve this we constructed an ADF pipeline that looked like this:
 Key notes about this approach:
The activities that were handled as part of this pipeline were:
We will go into these in more detail below.
The overall pipeline parameters looked like:
"parameters": {
"CastTableNames": {
"type": "array",
"defaultValue": [
"CAST_TABLE_A: cast(COL1 AS VARCHAR(100)) COL1, COL2, COL3, ..., cast(COL19 AS VARCHAR(100)) COL19, COL20",
…
"CAST_TABLE_Z: CAST(COL1 AS VARCHAR(250)) AS COL1, CAST(COL2 AS VARCHAR(250)) AS COL2, COL3"
]
},
"SchemaName": {
"type": "string",
"defaultValue": "ADMIN"
},
"EmailId": {
"type": "string",
"defaultValue": ...
},
"SmallTableNames": {
"type": "array",
"defaultValue": ["SMALL_TABLE_A",…, "SMALL_TABLE_Z"]
},
"LargeTableNames": {
"type": "array",
"defaultValue": ["LARGE_TABLE_A",…,"LARGE_TABLE_Z"]
},
"LandingLocation": {
"type": "string",
"defaultValue": "History"
},
"catalog": {
"type": "string",
"defaultValue": "dev_qa_or_prod_catalog"
},
"schema": {
"type": "string",
"defaultValue": "target_schema"
},
"historical_data_location": {
"type": "string",
"defaultValue": "History"
},
"notebook_path": {
"type": "string",
"defaultValue": "/src/databricks/python/notebooks/utils/"
},
"adls_landing_url": {
"type": "string",
"defaultValue": "abfss://container@account.dfs.core.windows.net/"
},
"email_scope": {
"type": "string",
"defaultValue": "Secrets-Scope"
},
"email_notebook_path": {
"type": "string",
"defaultValue": "/src/databricks/python/notebooks/common/send_email_history"
}
}
To give an idea of how this pipeline performed, and what you might expect with an appropriately sized Integration Runtime and Databricks clusters, our performance was as followed:
For each of the activity types mentioned above, here is information regarding the the input parameters as well as considerations for each type of activity. While the actual notebooks cannot be shared, this should provide enough detail so that you can replicate this setup with success.
The copy activity was straight forward. We always ran this in a for loop activity where @item() resolved to a table name. You can see that we used this to construct a query on ODBC like: "@concat('select * from ',item())".
The json looked like:
{
"name": "Copy table data",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.0:00:00",
"retry": 1,
"retryIntervalInSeconds": 900,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "OdbcSource",
"query": {
"value": "@concat('select * from ',item())",
"type": "Expression"
},
"queryTimeout": "01:30:00"
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobFSWriteSettings"
},
"formatSettings": ...
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
...
},
"path": ...
},
"parallelCopies": 30,
...
},
"inputs": [
{
"referenceName": "OdbcHistoricalDataLoad",
"type": "DatasetReference",
"parameters": {
"TableName": {
"value": "@item()",
"type": "Expression"
},
"SchemaName": {
"value": "@pipeline().parameters.SchemaName",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "Parquet",
"type": "DatasetReference",
"parameters": {
"TableName": {
"value": "@item()",
"type": "Expression"
},
"LandlingLocation": {
"value": "@pipeline().parameters.LandingLocation",
"type": "Expression"
}
}
}
]
}
This is the activity where we loaded from parquet to Delta and applied our custom logic as needed.
Here is a simplified version of the Databricks notebook used:
from pyspark.sql.functions import col,rtrim
from pyspark.sql.types import DateType, TimestampType
import os
catalog_name = dbutils.widgets.get('catalog_name')
schema_name = dbutils.widgets.get('schema_name')
historical_data_location = dbutils.widgets.get('historical_data_location')
table_type = dbutils.widgets.get("table_type")
spark.sql(f"use catalog {catalog_name}")
spark.sql(f"use database {schema_name}")
cast_tables = ["cast_table_a","cast_table_b",...]
small_tables = ["small_table_a","small_table_b",...]
large_tables = ["large_table_a","large_table_b",...]
if table_type == 'cast':
table_list = cast_tables
elif table_type == 'small':
table_list = small_tables
elif table_type == 'large':
table_list = large_tables
# write the table to Delta, but do some custom transformations needed for our project
for table_name in table_list:
# read in parquet data as a dataframe
try:
historical_data_df = spark.read.parquet(os.path.join(historical_data_location,table_name))
print(f"Loading Table: {table_name}")
target_schema = spark.read.table(table_name).drop('rowid').schema
# cast columns- do an rtrim on string columns
df_cast = historical_data_df.select([col(c.name).cast(c.dataType) if str(c.dataType) != "StringType()" else rtrim(col(c.name)).cast(c.dataType).alias(c.name) for c in target_schema])
# write to table
df_cast.write.mode("overwrite").saveAsTable(table_name)
except Exception as e:
# Print the error message
print("********An error occurred while reading data from Parquet: {} *********".format(str(e)))
# address a bug we found with time type conversions
for table in table_list:
date_timestamp_columns = []
df = spark.table(table)
for column in df.columns:
if isinstance(df.schema[column].dataType, DateType) or isinstance(df.schema[column].dataType, TimestampType):
date_timestamp_columns.append(column)
for column in date_timestamp_columns:
if spark.sql(f"select * from {table} where date({column}) = '0001-01-03'").count() != 0 :
print(f"Running Update for table {table} and column {column}")
spark.sql(f"UPDATE {table} SET {column} = {column} -interval 2 days WHERE date({column}) = '0001-01-03'")
The parameters look like:
{
"table_type": "one of the options, cast, small, or large",
"catalog_name": {
"value": "@pipeline().parameters.catalog",
"type": "Expression"
},
"schema_name": {
"value": "@pipeline().parameters.schema",
"type": "Expression"
},
"historical_data_location": {
"value": "@concat(pipeline().parameters.adls_landing_url,pipeline().parameters.historical_data_location)",
"type": "Expression"
}
}
This is the activity where we validated the tables loaded to Delta by using a JDBC connection to the source (Netezza in our case). Pay special attention to the column type matching in the validate_table() function, as it was carefully developed to be able to true up values between two disparate systems. You will need to carefully think through this for your project. You can also see here we encoded the rtrim. We had a separate notebook to address single tables for the large table thread.
As you will see below we validated the tables without the use of a join key by using Spark's exceptAll() DataFrame method. This was the best solution we could find given we did not have join keys. The algorithm simply checks to see whether each row in the left_df data frame matches any row in the right_df data frame:
left_df.exceptAll(right_df)
We persisted the non-matching rows for each table to a different location.
We certainly don't want to pull many gigabytes or terabytes worth of data from Netezza, so we included logic such that by default the code only pulled 1 million records from Netezza. Since we were using exceptAll() as our validation engine, and the Netezza data frame was on the left, this did not pose any problems.
Here is a simplified version of the Databricks notebook used:
from pyspark.sql.functions import *
dbutils.widgets.text('CATALOG', '')
CATALOG = dbutils.widgets.get('CATALOG')
dbutils.widgets.text('SCHEMA', '')
SCHEMA = dbutils.widgets.get('SCHEMA')
dbutils.widgets.text('validation_schema', 'db_validation_results')
validation_schema = dbutils.widgets.get('validation_schema')
dbutils.widgets.text('type', 'tables')
type = dbutils.widgets.get('type')
dbutils.widgets.text('scope', 'secret-scope')
scope = dbutils.widgets.getArgument('scope')
dbutils.widgets.text('key', 'scope-key')
key = dbutils.widgets.getArgument('key')
# password from scope
password = dbutils.secrets.get(scope=f"{scope}", key=f"{key}")
# COMMAND ----------
# MAGIC %sql
# MAGIC USE ${CATALOG}.${SCHEMA};
# COMMAND ----------
small_tables = [...]
large_tables = [...]
cast_tables = [...]
views_list = [...]
# COMMAND ----------
def validate_table(spark, table: str, limit: int, ntz_schema: str, db_schema: str, validation_schema: str, catalog: str, password):
print(f"Loading Netezza {table}")
properties = {
"url": f"jdbc:netezza://address:port/{ntz_schema}",
"user": "netezza-user",
"password": password,
"driver": "org.netezza.Driver"
}
nz_count = spark.read.format("jdbc") \
.options(url = properties["url"],
user = properties["user"],
password = properties["password"],
query=f"""SELECT COUNT(1) FROM admin.{table.upper()}""",
driver = properties["driver"]) \
.load() \
.collect()[0][0]
db_count = spark.sql(f"SELECT COUNT(1) FROM {catalog}.{db_schema}.{table}").collect()[0][0]
print(f"Netezza object count is {nz_count}")
print(f"Databricks object count is {db_count}")
if nz_count != db_count:
f"Check failed, Netezza has {nz_count} but Databricks has {db_count}"
return False
# Get the column names from Netezza
query_columns = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table.upper()}'"
# Read the column names into a DataFrame
column_df = spark.read \
.format("jdbc") \
.options(url = properties["url"],
user = properties["user"],
password = properties["password"],
query = query_columns,
driver = properties["driver"]) \
.load()
# Extract the column names from the DataFrame
columns = [(row[0], row[1]) for row in column_df.collect()]
# handle various data types
def cast_netezza_types(column: (str, str)):
match column[1]:
case 'DOUBLE PRECISION':
return column[0]
case other if "CHAR" in other:
return f"rtrim({column[0]}) AS {column[0]} "
case other if other.startswith("NUMERIC"):
return column[0]
case _:
return f"CAST({column[0]} AS VARCHAR(250)) AS {column[0]}"
# Construct the query with CAST to convert all columns to string
cast_columns = [cast_netezza_types(column) for column in columns]
cast_query = f"SELECT {', '.join(cast_columns)} FROM {table} LIMIT {limit}"
# read the table, casting each column to string
netezza_df = spark.read.format("jdbc") \
.options(url = properties["url"],
user = properties["user"],
password = properties["password"],
query = cast_query,
driver = properties["driver"]) \
.load()
# read the spark table and cast every column to string
databricks_df = spark.read.table(f"{catalog}.{db_schema}.{table}").drop("ROWID")
def cast_spark_types(column_name: str):
data_type = str(databricks_df.schema[column_name].dataType)
match data_type:
case 'DoubleType()':
return col(column_name)
case other if other.startswith("DecimalType"):
return col(column_name)
case 'StringType()':
return col(column_name)
case _:
return col(column_name).cast("string").alias(column_name)
databricks_str_df = databricks_df.select(*[
cast_spark_types(column_name)
for column_name in databricks_df.columns
])
# compare them
if len(netezza_df.columns) != len(databricks_str_df.columns):
raise Exception(f"Schema difference, Netezza version has {len(netezza_df.columns)} columns but Databricks has {len(databricks_str_df.columns)}")
nz_count = netezza_df.cache().count()
db_count = databricks_str_df.cache().count()
if nz_count != db_count and nz_count < limit:
raise Exception(f"Count difference, Netezza version has {nz_count} rows but Databricks has {db_count}")
except_result = netezza_df.na.fill('').exceptAll(databricks_str_df.na.fill(''))
if except_result.count() == 0:
print("Test passed")
netezza_df.unpersist()
databricks_str_df.unpersist()
return True
else:
diff_table = f"{catalog}.{validation_schema}.{table}_difference"
print(f"********* Differences detected for: {table}")
except_result.write.option("mergeSchema", "true").mode("overwrite").saveAsTable(diff_table)
netezza_df.unpersist()
databricks_str_df.unpersist()
print(f"\tSee differences in {diff_table}")
return False
# COMMAND ----------
object_list = []
ntz_schema = 'your_source_schema'
if type == 'cast':
object_list = cast_tables
elif type == 'small':
object_list = small_tables
elif type == 'large':
object_list = large_tables
elif type == 'views':
object_list = views_list
# COMMAND ----------
mismatched_list = []
matched_list = []
error_table = []
for table in object_list:
try:
if validate_table(spark, table, 1000000, ntz_schema, SCHEMA, 'db_validation_results', CATALOG, password):
matched_list.append(table)
else:
mismatched_list.append(table)
except Exception as e:
# Print the error message
print(f"********An error occurred while validating {table}: {str(e)} *********")
error_table.append(table)
If you've followed the previous notebooks, this one works similarly but is much simpler. We had a separate notebook to address single tables for the large table thread.
# Databricks notebook source
dbutils.widgets.text('CATALOG', '')
CATALOG = dbutils.widgets.get('CATALOG')
dbutils.widgets.text('SCHEMA', '')
SCHEMA = dbutils.widgets.get('SCHEMA')
dbutils.widgets.text('type', 'tables')
type = dbutils.widgets.get('type')
# COMMAND ----------
# MAGIC %sql
# MAGIC USE ${CATALOG}.${SCHEMA};
# COMMAND ----------
small_tables = [...]
large_tables = [...]
cast_tables = [...]
# COMMAND ----------
object_list = []
elif type == 'cast':
object_list = cast_tables
elif type == 'small':
object_list = small_tables
elif type == 'large':
object_list = large_tables
# COMMAND ----------
for table in object_list:
print(f"Optimizing Table: {table}")
spark.sql(f"OPTIMIZE {table}")
If you've read this far, I hope that you have seen at least one thing that makes you take pause and dig in. For instance:
The Great Historical Data Load pipeline you create will likely look different from this one, because of different requirements or complexity. In general keep these considerations in mind:
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.