cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
Dan_Z
Honored Contributor
Honored Contributor

Introduction

When migrating to Databricks, one of the first steps is loading historical data from on premise or other cloud services into the platform. Getting this data load set up in an automated and efficient way is crucial to executing a tight production cutover. You may have done (or are planning to do) a historical data load for development purposes, but when it's time to go live it really needs to be as fast and automated as possible. Every day after the source database's production snapshot is taken is another data of data catchup required between your team and the go-live.

I recently led a team for a major North American retailer in which Databricks helped migrate the customer's primary North American data warehouse and pipelines from on-prem to Azure Databricks. While there were a number of challenges, the one aspect that enabled our success during the stressful go-live period was how well our historical data load went. 

If you are following a standard pattern, you will do historical data load three times:

  • Into DEV: at the beginning of the project when you are loading the data for each table into Azure Databricks so that you have comprehensive production data for the development of Azure Databricks pipelines,
  • Into QA: for QA testing to make sure your pipelines run outside of their development environment, and
  • Into PROD: as part of go-live. Going live can be like "catching a frisbee" (as I like to say, you can quote me), so at this point the historical data load must be tight.

Let's talk about how make this historical data load as great as possible.

Requirements of a great historical data load

Let's outline what the requirements for a successful historical load are:

  • Fully automated: historical data load can take days, we don't want to be up in the middle of the night babysitting it.
  • All the way from source to delta lake: must take data from tables in the source database all the way to Delta Lake in Azure Databricks.
  • Custom casting: must be able to handle type mapping from the source system and Azure Databricks, or resolve quirks if data is loaded incorrectly.
  • DDLs: script(s) must run Delta Lake table DDLs so that special table properties like DEFAULT values, GENERATED columns, or special Delta configurations can be applied.
  • VIEWs: any VIEWs in the target system must be created.
  • Data validation: after the import, a script must run from Databricks to validate that what was imported is the same as what should have been imported.
  • OPTIMIZE: the tables should be OPTIMIZED for best performance
  • Large table handling: must handle large tables in a special way as to ensure they both are a) prioritized with regard to resources, and b) don't hold up the completion and validation of smaller tables. 

Options

Bringing the table data from the source location can be challenging given that the data may sit on-prem. We consider the following three options:

Connection Method Pros Cons
Apache Spark (TM) JDBC/ODBC No need to use ADF Not scalable out of the box, requires table information for partitioned read of large tables
ADF Delta Lake Connector Works for any size tables

Unable to customize table loads

Data quality issues

ADF Parquet Connector

Works for any size tables

Able to customize table loads

Requires extra step afterwards to load data into Delta Lake format

Given the need in our project to customize data loads for the following reasons:

  • rectify issues caused by type mapping
  • the need to rtrim string columns
  • set default values for Delta table columns
  • set GENERATED columns

We chose to go with the ADF Parquet Connector, and then follow up with a Databricks notebook that would load the data from parquet into a pre-defined table schema. Even if you don't have any special needs, I suggest you do the same so that you can make your code more flexible. It can also be nice to separate the step of importing to Parquet to the step to load to Delta, since if there is an issue with a table you can just drop the Delta table and re-load it without having to re-import the underlying data.

Solution Overview

To achieve this we constructed an ADF pipeline that looked like this:

Screenshot 2024-03-29 at 10.26.06 AM.png

 Key notes about this approach:

  • We handle small tables, large tables, and tables requiring custom code (cast tables) separately.
  • Large tables start loading first in a separate thread, so that they can grab all the resources from the Integration Runtime before smaller tables start running. We put a Wait activity ahead of the other thread to achieve this.
  • The pipeline is robust to Copy activity failures. If a table fails we append the table name to a variable and use that to send an email with the failed tables.
  • The Copy activity will bring data into Parquet, the LoadHistoricalDataInto* notebooks will load to Delta.

Types of Activities

The activities that were handled as part of this pipeline were:

  • Copy: ODBC source to Parquet sink
  • Load to table(s): Databricks notebook to load Parquet data to managed Delta, handle custom logic like type casting and schema issues. 
  • Validate table(s): Databricks notebook to connect back to ODBC source and validate tables was brought in correctly. Also run individually after each large table completes so tables are available before historical data load is completed.
  • OPTIMIZE table(s): Databricks notebook that runs OPTIMIZE on the tables.
  • Email: notify if there is a failure.

We will go into these in more detail below.

Pipeline Parameters

The overall pipeline parameters looked like:

 

"parameters": {
    "CastTableNames": {
        "type": "array",
        "defaultValue": [
            "CAST_TABLE_A: cast(COL1 AS VARCHAR(100)) COL1, COL2, COL3, ..., cast(COL19 AS VARCHAR(100)) COL19, COL20",
            …
            "CAST_TABLE_Z: CAST(COL1 AS VARCHAR(250)) AS COL1, CAST(COL2 AS VARCHAR(250)) AS COL2, COL3"
        ]
    },
    "SchemaName": {
        "type": "string",
        "defaultValue": "ADMIN"
    },
    "EmailId": {
        "type": "string",
        "defaultValue": ...
    },
    "SmallTableNames": {
        "type": "array",
        "defaultValue": ["SMALL_TABLE_A",…, "SMALL_TABLE_Z"]
    },
    "LargeTableNames": {
        "type": "array",
        "defaultValue": ["LARGE_TABLE_A",…,"LARGE_TABLE_Z"]
    },
    "LandingLocation": {
        "type": "string",
        "defaultValue": "History"
    },
    "catalog": {
        "type": "string",
        "defaultValue": "dev_qa_or_prod_catalog"
    },
    "schema": {
        "type": "string",
        "defaultValue": "target_schema"
    },
    "historical_data_location": {
        "type": "string",
        "defaultValue": "History"
    },
    "notebook_path": {
        "type": "string",
        "defaultValue": "/src/databricks/python/notebooks/utils/"
    },
    "adls_landing_url": {
        "type": "string",
        "defaultValue": "abfss://container@account.dfs.core.windows.net/"
    },
    "email_scope": {
        "type": "string",
        "defaultValue": "Secrets-Scope"
    },
    "email_notebook_path": {
        "type": "string",
        "defaultValue": "/src/databricks/python/notebooks/common/send_email_history"
    }
}

 


Performance

To give an idea of how this pipeline performed, and what you might expect with an appropriately sized Integration Runtime and Databricks clusters, our performance was as followed:

  • ~1,100 tables, most of which were small
  • ~100 larger tables
  • 4.5 TB
  • Most tables (minus the large ones) finished within 6 hours, finishing the complete pipeline for all the tables took ~33 hours

Activity Details

For each of the activity types mentioned above, here is information regarding the the input parameters as well as considerations for each type of activity. While the actual notebooks cannot be shared, this should provide enough detail so that you can replicate this setup with success.

Copy activity

The copy activity was straight forward. We always ran this in a for loop activity where @item() resolved to a table name. You can see that we used this to construct a query on ODBC like: "@concat('select * from ',item())".

Considerations

  • Always feed in your destination locations (in this case, landing) from the overall pipeline params.
  • Save the parquet data in a safe location that wont be touched. This is just staging, not final location, so can be used as a backup.

The json looked like:

 

{
  "name": "Copy table data",
  "type": "Copy",
  "dependsOn": [],
  "policy": {
    "timeout": "7.0:00:00",
    "retry": 1,
    "retryIntervalInSeconds": 900,
    "secureOutput": false,
    "secureInput": false
  },
  "userProperties": [],
  "typeProperties": {
    "source": {
      "type": "OdbcSource",
      "query": {
        "value": "@concat('select * from ',item())",
        "type": "Expression"
      },
      "queryTimeout": "01:30:00"
    },
    "sink": {
      "type": "ParquetSink",
      "storeSettings": {
        "type": "AzureBlobFSWriteSettings"
      },
      "formatSettings": ...
    },
    "enableStaging": true,
    "stagingSettings": {
      "linkedServiceName": {
        ...
      },
      "path": ...
    },
    "parallelCopies": 30,
    ...
  },
  "inputs": [
    {
      "referenceName": "OdbcHistoricalDataLoad",
      "type": "DatasetReference",
      "parameters": {
        "TableName": {
          "value": "@item()",
          "type": "Expression"
        },
        "SchemaName": {
          "value": "@pipeline().parameters.SchemaName",
          "type": "Expression"
        }
      }
    }
  ],
  "outputs": [
    {
      "referenceName": "Parquet",
      "type": "DatasetReference",
      "parameters": {
        "TableName": {
          "value": "@item()",
          "type": "Expression"
        },
        "LandlingLocation": {
          "value": "@pipeline().parameters.LandingLocation",
          "type": "Expression"
        }
      }
    }
  ]
}

 


Load to tables

This is the activity where we loaded from parquet to Delta and applied our custom logic as needed.

Here is a simplified version of the Databricks notebook used:

 

from pyspark.sql.functions import col,rtrim
from pyspark.sql.types import DateType, TimestampType
import os

catalog_name = dbutils.widgets.get('catalog_name')
schema_name = dbutils.widgets.get('schema_name')
historical_data_location = dbutils.widgets.get('historical_data_location')
table_type = dbutils.widgets.get("table_type")

spark.sql(f"use catalog {catalog_name}")
spark.sql(f"use database {schema_name}")

cast_tables = ["cast_table_a","cast_table_b",...]
small_tables = ["small_table_a","small_table_b",...]
large_tables = ["large_table_a","large_table_b",...]

if table_type == 'cast':
    table_list = cast_tables
elif table_type == 'small':
    table_list = small_tables
elif table_type == 'large':
    table_list = large_tables

# write the table to Delta, but do some custom transformations needed for our project
for table_name in table_list:
    # read in parquet data as a dataframe
    try:
        historical_data_df = spark.read.parquet(os.path.join(historical_data_location,table_name))
        print(f"Loading Table: {table_name}")
        target_schema = spark.read.table(table_name).drop('rowid').schema
        
        # cast columns- do an rtrim on string columns
        df_cast = historical_data_df.select([col(c.name).cast(c.dataType) if str(c.dataType) != "StringType()" else rtrim(col(c.name)).cast(c.dataType).alias(c.name) for c in target_schema])

        # write to table
        df_cast.write.mode("overwrite").saveAsTable(table_name)

    except Exception as e:
        # Print the error message
        print("********An error occurred while reading data from Parquet: {} *********".format(str(e)))

# address a bug we found with time type conversions
for table in table_list:
    date_timestamp_columns = []
    df = spark.table(table)
    for column in df.columns:
        if isinstance(df.schema[column].dataType, DateType) or isinstance(df.schema[column].dataType, TimestampType):
            date_timestamp_columns.append(column)

    for column in date_timestamp_columns:
        if spark.sql(f"select * from {table} where date({column}) = '0001-01-03'").count() != 0 :
            print(f"Running Update for table {table} and column {column}")
            spark.sql(f"UPDATE {table} SET {column} = {column} -interval 2 days WHERE date({column}) = '0001-01-03'")

 

The parameters look like:

 

{
  "table_type": "one of the options, cast, small, or large",
  "catalog_name": {
    "value": "@pipeline().parameters.catalog",
    "type": "Expression"
  },
  "schema_name": {
    "value": "@pipeline().parameters.schema",
    "type": "Expression"
  },
  "historical_data_location": {
    "value": "@concat(pipeline().parameters.adls_landing_url,pipeline().parameters.historical_data_location)",
    "type": "Expression"
  }
}

 


Validate tables

This is the activity where we validated the tables loaded to Delta by using a JDBC connection to the source (Netezza in our case). Pay special attention to the column type matching in the validate_table() function, as it was carefully developed to be able to true up values between two disparate systems. You will need to carefully think through this for your project. You can also see here we encoded the rtrim.  We had a separate notebook to address single tables for the large table thread.

Validation engine

As you will see below we validated the tables without the use of a join key by using Spark's exceptAll() DataFrame method. This was the best solution we could find given we did not have join keys. The algorithm simply checks to see whether each row in the left_df data frame matches any row in the right_df data frame:

 

left_df.exceptAll(right_df)

 

We persisted the non-matching rows for each table to a different location. 

Large table logic

We certainly don't want to pull many gigabytes or terabytes worth of data from Netezza, so we included logic such that by default the code only pulled 1 million records from Netezza. Since we were using exceptAll() as our validation engine, and the Netezza data frame was on the left, this did not pose any problems. 

Code

Here is a simplified version of the Databricks notebook used:

 

from pyspark.sql.functions import *

dbutils.widgets.text('CATALOG', '')
CATALOG = dbutils.widgets.get('CATALOG')

dbutils.widgets.text('SCHEMA', '')
SCHEMA = dbutils.widgets.get('SCHEMA')

dbutils.widgets.text('validation_schema', 'db_validation_results')
validation_schema = dbutils.widgets.get('validation_schema')

dbutils.widgets.text('type', 'tables')
type = dbutils.widgets.get('type')

dbutils.widgets.text('scope', 'secret-scope')
scope = dbutils.widgets.getArgument('scope')

dbutils.widgets.text('key', 'scope-key')
key = dbutils.widgets.getArgument('key')

# password from scope
password = dbutils.secrets.get(scope=f"{scope}", key=f"{key}")

# COMMAND ----------

# MAGIC %sql
# MAGIC USE ${CATALOG}.${SCHEMA};

# COMMAND ----------

small_tables = [...]
large_tables = [...]
cast_tables = [...]
views_list = [...]

# COMMAND ----------

def validate_table(spark, table: str, limit: int, ntz_schema: str, db_schema: str, validation_schema: str, catalog: str, password):
    print(f"Loading Netezza {table}")

    properties = {
        "url": f"jdbc:netezza://address:port/{ntz_schema}",
        "user": "netezza-user",
        "password": password,
        "driver": "org.netezza.Driver"
    }

    nz_count = spark.read.format("jdbc") \
        .options(url = properties["url"], 
        user = properties["user"],
        password = properties["password"],
        query=f"""SELECT COUNT(1) FROM admin.{table.upper()}""", 
        driver = properties["driver"]) \
        .load() \
        .collect()[0][0]
        
    db_count = spark.sql(f"SELECT COUNT(1) FROM {catalog}.{db_schema}.{table}").collect()[0][0]

    print(f"Netezza object count is {nz_count}")
    print(f"Databricks object count is {db_count}")
    if nz_count != db_count:
        f"Check failed, Netezza has {nz_count} but Databricks has {db_count}"
        return False

    # Get the column names from Netezza
    query_columns = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{table.upper()}'"

    # Read the column names into a DataFrame
    column_df = spark.read \
        .format("jdbc") \
        .options(url = properties["url"], 
            user = properties["user"],
            password = properties["password"],
            query = query_columns, 
            driver = properties["driver"]) \
        .load()

    # Extract the column names from the DataFrame
    columns = [(row[0], row[1]) for row in column_df.collect()]

    # handle various data types
    def cast_netezza_types(column: (str, str)):
        match column[1]:
            case 'DOUBLE PRECISION':
                return column[0]
            case other if "CHAR" in other:
                return f"rtrim({column[0]}) AS {column[0]} "
            case other if other.startswith("NUMERIC"):
                return column[0]
            case _:
                return f"CAST({column[0]} AS VARCHAR(250)) AS {column[0]}"

    # Construct the query with CAST to convert all columns to string
    cast_columns = [cast_netezza_types(column) for column in columns]
    cast_query = f"SELECT {', '.join(cast_columns)} FROM {table} LIMIT {limit}"

    # read the table, casting each column to string
    netezza_df = spark.read.format("jdbc") \
        .options(url = properties["url"], 
            user = properties["user"],
            password = properties["password"],
            query = cast_query, 
            driver = properties["driver"]) \
        .load()

    # read the spark table and cast every column to string
    databricks_df = spark.read.table(f"{catalog}.{db_schema}.{table}").drop("ROWID") 

    def cast_spark_types(column_name: str):
        data_type = str(databricks_df.schema[column_name].dataType)
        match data_type:
            case 'DoubleType()':
                return col(column_name)
            case other if other.startswith("DecimalType"):
                return col(column_name)
            case 'StringType()':
                return col(column_name)
            case _:
                return col(column_name).cast("string").alias(column_name)
    

    databricks_str_df = databricks_df.select(*[
        cast_spark_types(column_name)
        for column_name in databricks_df.columns
    ])
    # compare them
    if len(netezza_df.columns) != len(databricks_str_df.columns):
        raise Exception(f"Schema difference, Netezza version has {len(netezza_df.columns)} columns but Databricks has {len(databricks_str_df.columns)}")

    nz_count = netezza_df.cache().count()
    db_count = databricks_str_df.cache().count()
    if nz_count != db_count and nz_count < limit:
        raise Exception(f"Count difference, Netezza version has {nz_count} rows but Databricks has {db_count}")  

    except_result = netezza_df.na.fill('').exceptAll(databricks_str_df.na.fill(''))
    if except_result.count() == 0:
        print("Test passed")
        netezza_df.unpersist()
        databricks_str_df.unpersist()
        return True
    else:
        diff_table = f"{catalog}.{validation_schema}.{table}_difference"
        print(f"********* Differences detected for: {table}")
        except_result.write.option("mergeSchema", "true").mode("overwrite").saveAsTable(diff_table)

        netezza_df.unpersist()
        databricks_str_df.unpersist()
        print(f"\tSee differences in {diff_table}")
        return False

# COMMAND ----------

object_list = []
ntz_schema = 'your_source_schema'
if type == 'cast':
    object_list = cast_tables    
elif type == 'small':
    object_list = small_tables    
elif type == 'large':
    object_list = large_tables  
elif type == 'views':
    object_list = views_list


# COMMAND ----------

mismatched_list = []
matched_list = []
error_table = []
for table in object_list:
    try:
        if validate_table(spark, table, 1000000, ntz_schema, SCHEMA, 'db_validation_results', CATALOG, password):
            matched_list.append(table)
        else:
            mismatched_list.append(table)
    except Exception as e:
        # Print the error message
        print(f"********An error occurred while validating {table}: {str(e)} *********")
        error_table.append(table)

 


Optimize tables

If you've followed the previous notebooks, this one works similarly but is much simpler. We had a separate notebook to address single tables for the large table thread.

 

# Databricks notebook source
dbutils.widgets.text('CATALOG', '')
CATALOG = dbutils.widgets.get('CATALOG')
dbutils.widgets.text('SCHEMA', '')
SCHEMA = dbutils.widgets.get('SCHEMA')
dbutils.widgets.text('type', 'tables')
type = dbutils.widgets.get('type')

# COMMAND ----------

# MAGIC %sql
# MAGIC USE ${CATALOG}.${SCHEMA};

# COMMAND ----------

small_tables = [...]
large_tables = [...]
cast_tables = [...]

# COMMAND ----------

object_list = []
elif type == 'cast':
    object_list = cast_tables    
elif type == 'small':
    object_list = small_tables    
elif type == 'large':
    object_list = large_tables


# COMMAND ----------

for table in object_list:
    print(f"Optimizing Table: {table}")
    spark.sql(f"OPTIMIZE {table}")

 


General Considerations

If you've read this far, I hope that you have seen at least one thing that makes you take pause and dig in. For instance:

  • If you use the Delta Lake connector in ADF, how would you handle GENERATED columns?
  • How do you validate tables from your source and target systems when their data types may not match up?
  • How can you avoid running all tables sequentially, but still give continuous resources to the largest tables?

The Great Historical Data Load pipeline you create will likely look different from this one, because of different requirements or complexity. In general keep these considerations in mind:

  • Construct the pipeline in DEV, refine it for QA, and test it once before the final run in PROD. You never know what validation issues may come up with when you have new data. 
  • The ADF ODBC driver is not perfect. Make sure to double check everything.
  • Encode any fixes to the previous point. We ran this pipeline many times and you do not want to do any manual efforts.
  • Parameterize everything you possibly can. You don't know when you might need to switch to a new CATALOG or SCHEMA. 
  • Handle your large tables separate. It was a boon to the project that we could start working with most of the tables before all the large tables were finished.
  • The table validation was not airtight for larger tables, remember we only checked up to 1 million columns. It's a trade-off.