cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
thewizard
Databricks Employee
Databricks Employee

For many use cases, the de facto method for loading tabular data into a Databricks Lakehouse from a relational database (RDBMS) uses an ETL tool to connect using JDBC or CDC. The data is read, frequently in parallel; transformations are applied before writing it either directly to the data lake or via a set of intermediate files, where it will be loaded into Parquet or Delta tables.thewizard_0-1729793252772.png

However, consider the following real customer scenarios:

  • An analytics team wants to migrate data in a legacy on-premises MySQL database used by a decommissioned LOB system, which has been exported using mysqldump. The database could be restored to a cloud MySQL database, but this would add delay and complexity.
  • A closed source third party vendor system, behind a firewall, does not support API access or direct JDBC connections to extract data. They only provide extracts of the data as files containing SQL statements. These extracts are required to be loaded to the Databricks Lakehouse within 5 minutes to support hourly reporting updates.

SQL Dump Files

In both the use cases above, the data can be generated as one or more ".SQL" files containing many SQL statements, which, when run in order, will reconstruct the tables and data as they appeared on the source database. 

This approach is common with MySQL and PostgresQL, which can generate these “dump” files using the mysqldump and pg_dump commands, respectively. It is often used as a simple way of creating a database backup.

The SQL statements can be contained in a single dump file containing 1000s of SQL statements or split across multiple files, sometimes with one file per source table. 

These files often contain some source-specific syntax that may need to be pre-processed before applying to the target database:

  • Data types may use different names, such as BIT, DATETIME, VARCHAR
  • Quoted identifiers such as table and column names may use a quote, square bracket or backtick
  • Generated columns, such as auto-incrementing integers
  • Foreign key constraints

A trivial example .SQL file is shown below:

 

DROP DATABASE IF EXISTS `retail_prod`;
CREATE DATABASE `retail_prod`;
USE `retail_prod`;


CREATE TABLE `t_customer` (
   `customer_id` INT NOT NULL AUTO_INCREMENT,
   `customer_name` VARCHAR(50) NOT NULL,
   `region_id` INT NULL,
   `expired` BIT NULL,
   `create_timestamp` DATETIME NULL,
   CONSTRAINT PK_t_customer PRIMARY KEY (`customer_id`)
);


CREATE TABLE `t_region` (
   `region_id` INT NOT NULL AUTO_INCREMENT,
   CONSTRAINT PK_t_region PRIMARY KEY (`region_id`)
);


CREATE TABLE `t_order`
(
   `order_id` INT NOT NULL AUTO_INCREMENT,
   `customer_id` INT NULL,
   `product` VARCHAR(255) NULL,
   CONSTRAINT PK_t_order PRIMARY KEY (`order_id`)
);


BEGIN TRANSACTION;
INSERT INTO `t_customer` (`customer_id`, `customer_name`, `region_id`)
VALUES
(0, 'Snap', 5),
(1, 'Crackle', 26);
COMMIT;


BEGIN TRANSACTION;
INSERT INTO `t_region` (`region_id`)
VALUES
(5),
(26);
COMMIT;


BEGIN TRANSACTION;
INSERT INTO `t_order` (`order_id`, `customer_id`, `product`)
VALUES
(1, 1, 'Pop'),
(2, 1, 'Fizz');
COMMIT;

 

Importing Dump Files into a Lakehouse

In the customer scenarios above, the business required that this data be loaded into the Databricks Lakehouse for analytics and data science use cases. There are several options to achieve this:

  1. Running the SQL statements directly against a SQL API such as DBSQL
    Although very straightforward, these files typically contain many tiny SQL statements. Even with a 10ms query execution time, this would take hours to load and need to be more efficient.
  2. Restore these SQL files to a temporary RDBMS instance first
    Create a cloud RDBMS instance and apply the SQL files to that instance before exporting the data into the cloud data warehouse. This wasn’t an acceptable solution to the customer, as it (a) required additional infrastructure and (b) was still slow to restore a large SQL dump file and then export the data to the data lake.
  3. Restore the statements to an in-memory database on a Databricks cluster, then export
    To avoid additional infrastructure, we can use a database that supports in-memory, such as SQLite or DuckDB. This reduces both the time to load the SQL statements and the time to export the data to the lake, as the network bandwidth is reduced. 

The high-level process for this final option is shown below:

thewizard_1-1729793252761.png

Choosing an in-memory database

We evaluated SQLite3 and DuckDB to see how this approach would perform. Other in-memory databases are available, but these two can easily be installed within Python without additional shell scripts. 

However, DuckDB has much better options for integrating the data with Databricks:

  • Delta Table writer support, so no additional tools are needed to write to the Delta Lake
  • Arrow support so that the data in DuckDB can be efficiently loaded as a Spark DataFrame for further downstream transformations
  • SQL syntax is very similar to MySQL SQL syntax, so most operations supported

End-to-end process

The code snippets to load the .SQL files are described below.

Read .SQL files from blob storage

This snipper loads the .SQL file from blob storage and splits it into individual SQL statements using a preconfigured separator, usually the ‘;’ character.

 

service_client = DataLakeServiceClient.from_connection_string(conn_str=conn_str)
fs_client = service_client.get_file_system_client(share_name)
dir_client = fs_client.get_directory_client(directory_name)
file_client = dir_client.get_file_client(file_name)

contents = file_client.download_file().readall()

statements = contents.decode('utf-8').split(";")

 

Pre-process SQL statements

Using standard regular expressions, any unsupported statements or hard-coded names are removed from the SQL statements.

 

# Create regular expressions to clean up the SQL statements
auto_increment_regex = re.compile(r"AUTO_INCREMENT")

# Read a SQL statement and clean it up
def process_statement(statement):
  if re.search(r"CREATE TABLE", statement):
    statement = auto_increment_regex.sub("", statement)
    statement = statement.replace('`', '"')

  elif re.search(r"INSERT INTO", statement):
    statement = statement.replace('`', '"')

  return statement

statements = map(lambda x: process_statement(x), statements)

 

Data types are converted if required, e.g., BIT to BOOLEAN and DATETIME to TIMESTAMP. Quoting/escape characters, such as the backtick (‘`’) character, may need to be converted to `”`.

An example of an unsupported statement is the AUTO_INCREMENT MySQL statement, which gives an ID column an auto-incrementing integer value.

This is not supported in duckdb but can simply be removed, as the tables already have these columns populated in the source database.

All the foreign key constraints can be ignored, as the data already has the constraints checked at source, and the data will immediately be exported to Delta Lake anyway.

Create a DuckDB in-memory database

Open a connection and cursor against the DuckDB in-memory database, and then run the statements in the same order as they arrived in the .SQL file.

 

# create an in-memory duckdb instance
connection = duckdb.connect(':memory:')

# create a cursor for this thread
cursor = connection.cursor()

for statement in statements:
  cursor.execute(statement).fetchall()

 

DuckDB will be running on the driver, so it will need sufficient memory to store all the data loaded from the .SQL files.

Convert the Pandas DataFrame to Spark and write

 

# load the table from duckdb and write out to a Delta table using Spark
def write_table(cursor, table):
  df = cursor.table(table).fetchdf()

  if len(df.index) == 0:
    print(f"Skipped empty table {table}")
  else:
    # write the Delta table
    spark.createDataFrame(df).write \
      .mode("overwrite") \
      .save(f"{path}/{table}")

 

You can also use the write_deltalake API with DuckDB to write out the Delta tables, but this is slower than Spark for large tables as it doesn’t split it into multiple in-memory partitions. For reference, this syntax is shown below:

 

write_deltalake(
     table_or_uri=f"abfss://<path>/{table}",
     data=df, storage_options=storage_options,
     mode="overwrite")

 

Record counts, checksums, and schemas were then reconciled to the source database to ensure that all values were correctly copied over (the time taken for these checks was excluded from the benchmark).

Meeting the customer’s SLA

The customer had a challenging 5-minute SLA to restore a single .SQL file containing over 300k SQL statements once every hour to provide up-to-date reports.

The table below shows the performance based on the three options listed earlier:

thewizard_3-1729795002180.png

The performance with DuckDB is already less than the total time required to restore the SQL statements to a separate MySQL instance and then copy the data to a table in the lakehouse using JDBC. However, for the customer use case captured above, the target SLA to load 300k statements was under 5 minutes, so the process needed further optimisation.

Applying multi-threading

A brief introduction to the structure of an SQL dump file will help explain how multi-threading can be used to achieve much more impressive performance.

Typically, an SQL dump file generated with mysqldump or pg_dump is structured consistently, as seen in the SQL example earlier. DDL for all tables and foreign keys comes first, followed by multiple INSERT statements:

thewizard_1-1729794903712.png

The DDL must run first, but all foreign key constraints, table constraints and auto-increment columns can be ignored, so there are no dependencies between tables.

In addition, any SQL dump files generated as part of a full or partial backup will only contain INSERT statements. There will be no UPDATE statements, so these INSERT statements can run in parallel, in any order, as soon as the table is created.

Once the data is loaded to a given table in DuckDB, the data can be immediately written out to a Delta table - there is no requirement to wait for other tables to complete loading. So this too can be run in parallel. However, this option still needs to be implemented but left as a future exercise.

This diagram shows the high-level optimised multi-threaded process, using two concurrent queues:

thewizard_2-1729793252758.png

Each step of the process is described below:

Read from blob storage in chunks

The Azure BlobServiceClient API provides a way to load the .SQL files from blob storage in chunks:

 

blob_client = blob_service_client.get_blob_client(
  container=container_name, 
  blob="sample-blob.txt")
stream = blob_client.download_blob()
for chunk in stream.chunks():
  # Process your data

 

As soon as the first chunk is loaded, it does the following:

  • Splits it up into individual SQL statements using the preconfigured separator
  • Runs the pre-processing step to clean up the statements
  • If the statement is DDL - adds the statement to a concurrent queue specifically for DDL
  • Otherwise, if the statement is not DDL- adds the statement to a separate concurrent queue containing all the INSERT statements

In the background, it loads the additional chunks from blob storage and processes each one in turn. If any statements overlap two files, it joins them back together.

Note: although .SQL files could be read as text by Spark/Autoloader in parallel, this would result in arbitrary file splits, making it difficult to:

  • Maintain ordering of SQL statements
  • Stitch together SQL statements that overlap two files

Run the SQL statements against DuckDB in parallel

Next, a thread is created to process the “DDL” queue, running the DDL statements using the DuckDB SQL API immediately as they are added to the queue. This ensures that all the DDLs run first. 

 

# Run a queue of statements against a duckdb connection
def duckdb_runner(connection, queue):
 # Create a cursor for this thread
 cursor = connection.cursor()

 while True:
   try:
     # Attempt to get a statement off the queue; if the queue is empty, retry
     statement = queue.popleft()

     # If the statement is the termination marker, abort this thread
     if statement == "_END_":
       break

     # Otherwise run the statement
     cursor.execute(statement).fetchall()
   except IndexError:
     # If the queue is empty, wait 100ms and retry
     time.sleep(0.1)
   except Exception as ex:
     print(f"Statement: {statement}")
     print(ex)

 cursor.close()

 

Once all the DDL is run, a thread pool is created based on the number of cores available. These threads consume from the “INSERT” queue, running the INSERT statements as fast as possible against the DuckDB SQL API.

Export the DuckDb tables in parallel

Now all the INSERT statements are run, the in-memory database will be fully populated. Another thread pool is created that iterates over all tables in the in-memory database, exporting each to a Pandas DataFrame as above.

The same thread then converts the Pandas DataFrame to a Spark DataFrame and writes it out as a Delta table.

 

# load table from DuckDB, write to Delta table using Spark
def write_table(connection, table):
  # create a new cursor, required for multi-threading
  cursor = connection.cursor()

  df = cursor.table(table).fetchdf()

  if len(df.index) == 0:
    print(f"Skipped empty table {table}")
  else:
    # write the Delta table
    spark.createDataFrame(df).write \
      .mode("overwrite") \
      .save(f"{path}/{table}")

  cursor.close()

 

Performance with multi-threading

This blog illustrates a simple way data engineers can use Databricks to quickly load SQL dump files into the Lakehouse, leveraging the speed of an in-memory DuckDB database.

The customer teams already had access to Databricks, and the only constraint was that the driver needed to be large enough to contain the in-memory database. If the database was much more extensive, it could be necessary to restore one table at a time and then remove it from the in-memory database.

With the new multi-threaded process, the performance is substantially faster than techniques that involve standing up a MySQL instance:

thewizard_1-1729872458504.png

thewizard_0-1729794828040.png