For many use cases, the de facto method for loading tabular data into a Databricks Lakehouse from a relational database (RDBMS) uses an ETL tool to connect using JDBC or CDC. The data is read, frequently in parallel; transformations are applied before writing it either directly to the data lake or via a set of intermediate files, where it will be loaded into Parquet or Delta tables.
However, consider the following real customer scenarios:
In both the use cases above, the data can be generated as one or more ".SQL" files containing many SQL statements, which, when run in order, will reconstruct the tables and data as they appeared on the source database.
This approach is common with MySQL and PostgresQL, which can generate these “dump” files using the mysqldump and pg_dump commands, respectively. It is often used as a simple way of creating a database backup.
The SQL statements can be contained in a single dump file containing 1000s of SQL statements or split across multiple files, sometimes with one file per source table.
These files often contain some source-specific syntax that may need to be pre-processed before applying to the target database:
A trivial example .SQL file is shown below:
DROP DATABASE IF EXISTS `retail_prod`;
CREATE DATABASE `retail_prod`;
USE `retail_prod`;
CREATE TABLE `t_customer` (
`customer_id` INT NOT NULL AUTO_INCREMENT,
`customer_name` VARCHAR(50) NOT NULL,
`region_id` INT NULL,
`expired` BIT NULL,
`create_timestamp` DATETIME NULL,
CONSTRAINT PK_t_customer PRIMARY KEY (`customer_id`)
);
CREATE TABLE `t_region` (
`region_id` INT NOT NULL AUTO_INCREMENT,
CONSTRAINT PK_t_region PRIMARY KEY (`region_id`)
);
CREATE TABLE `t_order`
(
`order_id` INT NOT NULL AUTO_INCREMENT,
`customer_id` INT NULL,
`product` VARCHAR(255) NULL,
CONSTRAINT PK_t_order PRIMARY KEY (`order_id`)
);
BEGIN TRANSACTION;
INSERT INTO `t_customer` (`customer_id`, `customer_name`, `region_id`)
VALUES
(0, 'Snap', 5),
(1, 'Crackle', 26);
COMMIT;
BEGIN TRANSACTION;
INSERT INTO `t_region` (`region_id`)
VALUES
(5),
(26);
COMMIT;
BEGIN TRANSACTION;
INSERT INTO `t_order` (`order_id`, `customer_id`, `product`)
VALUES
(1, 1, 'Pop'),
(2, 1, 'Fizz');
COMMIT;
In the customer scenarios above, the business required that this data be loaded into the Databricks Lakehouse for analytics and data science use cases. There are several options to achieve this:
The high-level process for this final option is shown below:
We evaluated SQLite3 and DuckDB to see how this approach would perform. Other in-memory databases are available, but these two can easily be installed within Python without additional shell scripts.
However, DuckDB has much better options for integrating the data with Databricks:
The code snippets to load the .SQL files are described below.
This snipper loads the .SQL file from blob storage and splits it into individual SQL statements using a preconfigured separator, usually the ‘;’ character.
service_client = DataLakeServiceClient.from_connection_string(conn_str=conn_str)
fs_client = service_client.get_file_system_client(share_name)
dir_client = fs_client.get_directory_client(directory_name)
file_client = dir_client.get_file_client(file_name)
contents = file_client.download_file().readall()
statements = contents.decode('utf-8').split(";")
Using standard regular expressions, any unsupported statements or hard-coded names are removed from the SQL statements.
# Create regular expressions to clean up the SQL statements
auto_increment_regex = re.compile(r"AUTO_INCREMENT")
# Read a SQL statement and clean it up
def process_statement(statement):
if re.search(r"CREATE TABLE", statement):
statement = auto_increment_regex.sub("", statement)
statement = statement.replace('`', '"')
elif re.search(r"INSERT INTO", statement):
statement = statement.replace('`', '"')
return statement
statements = map(lambda x: process_statement(x), statements)
Data types are converted if required, e.g., BIT to BOOLEAN and DATETIME to TIMESTAMP. Quoting/escape characters, such as the backtick (‘`’) character, may need to be converted to `”`.
An example of an unsupported statement is the AUTO_INCREMENT MySQL statement, which gives an ID column an auto-incrementing integer value.
This is not supported in duckdb but can simply be removed, as the tables already have these columns populated in the source database.
All the foreign key constraints can be ignored, as the data already has the constraints checked at source, and the data will immediately be exported to Delta Lake anyway.
Open a connection and cursor against the DuckDB in-memory database, and then run the statements in the same order as they arrived in the .SQL file.
# create an in-memory duckdb instance
connection = duckdb.connect(':memory:')
# create a cursor for this thread
cursor = connection.cursor()
for statement in statements:
cursor.execute(statement).fetchall()
DuckDB will be running on the driver, so it will need sufficient memory to store all the data loaded from the .SQL files.
# load the table from duckdb and write out to a Delta table using Spark
def write_table(cursor, table):
df = cursor.table(table).fetchdf()
if len(df.index) == 0:
print(f"Skipped empty table {table}")
else:
# write the Delta table
spark.createDataFrame(df).write \
.mode("overwrite") \
.save(f"{path}/{table}")
You can also use the write_deltalake API with DuckDB to write out the Delta tables, but this is slower than Spark for large tables as it doesn’t split it into multiple in-memory partitions. For reference, this syntax is shown below:
write_deltalake(
table_or_uri=f"abfss://<path>/{table}",
data=df, storage_options=storage_options,
mode="overwrite")
Record counts, checksums, and schemas were then reconciled to the source database to ensure that all values were correctly copied over (the time taken for these checks was excluded from the benchmark).
The customer had a challenging 5-minute SLA to restore a single .SQL file containing over 300k SQL statements once every hour to provide up-to-date reports.
The table below shows the performance based on the three options listed earlier:
The performance with DuckDB is already less than the total time required to restore the SQL statements to a separate MySQL instance and then copy the data to a table in the lakehouse using JDBC. However, for the customer use case captured above, the target SLA to load 300k statements was under 5 minutes, so the process needed further optimisation.
A brief introduction to the structure of an SQL dump file will help explain how multi-threading can be used to achieve much more impressive performance.
Typically, an SQL dump file generated with mysqldump or pg_dump is structured consistently, as seen in the SQL example earlier. DDL for all tables and foreign keys comes first, followed by multiple INSERT statements:
The DDL must run first, but all foreign key constraints, table constraints and auto-increment columns can be ignored, so there are no dependencies between tables.
In addition, any SQL dump files generated as part of a full or partial backup will only contain INSERT statements. There will be no UPDATE statements, so these INSERT statements can run in parallel, in any order, as soon as the table is created.
Once the data is loaded to a given table in DuckDB, the data can be immediately written out to a Delta table - there is no requirement to wait for other tables to complete loading. So this too can be run in parallel. However, this option still needs to be implemented but left as a future exercise.
This diagram shows the high-level optimised multi-threaded process, using two concurrent queues:
Each step of the process is described below:
The Azure BlobServiceClient API provides a way to load the .SQL files from blob storage in chunks:
blob_client = blob_service_client.get_blob_client(
container=container_name,
blob="sample-blob.txt")
stream = blob_client.download_blob()
for chunk in stream.chunks():
# Process your data
As soon as the first chunk is loaded, it does the following:
In the background, it loads the additional chunks from blob storage and processes each one in turn. If any statements overlap two files, it joins them back together.
Note: although .SQL files could be read as text by Spark/Autoloader in parallel, this would result in arbitrary file splits, making it difficult to:
Next, a thread is created to process the “DDL” queue, running the DDL statements using the DuckDB SQL API immediately as they are added to the queue. This ensures that all the DDLs run first.
# Run a queue of statements against a duckdb connection
def duckdb_runner(connection, queue):
# Create a cursor for this thread
cursor = connection.cursor()
while True:
try:
# Attempt to get a statement off the queue; if the queue is empty, retry
statement = queue.popleft()
# If the statement is the termination marker, abort this thread
if statement == "_END_":
break
# Otherwise run the statement
cursor.execute(statement).fetchall()
except IndexError:
# If the queue is empty, wait 100ms and retry
time.sleep(0.1)
except Exception as ex:
print(f"Statement: {statement}")
print(ex)
cursor.close()
Once all the DDL is run, a thread pool is created based on the number of cores available. These threads consume from the “INSERT” queue, running the INSERT statements as fast as possible against the DuckDB SQL API.
Now all the INSERT statements are run, the in-memory database will be fully populated. Another thread pool is created that iterates over all tables in the in-memory database, exporting each to a Pandas DataFrame as above.
The same thread then converts the Pandas DataFrame to a Spark DataFrame and writes it out as a Delta table.
# load table from DuckDB, write to Delta table using Spark
def write_table(connection, table):
# create a new cursor, required for multi-threading
cursor = connection.cursor()
df = cursor.table(table).fetchdf()
if len(df.index) == 0:
print(f"Skipped empty table {table}")
else:
# write the Delta table
spark.createDataFrame(df).write \
.mode("overwrite") \
.save(f"{path}/{table}")
cursor.close()
This blog illustrates a simple way data engineers can use Databricks to quickly load SQL dump files into the Lakehouse, leveraging the speed of an in-memory DuckDB database.
The customer teams already had access to Databricks, and the only constraint was that the driver needed to be large enough to contain the in-memory database. If the database was much more extensive, it could be necessary to restore one table at a time and then remove it from the in-memory database.
With the new multi-threaded process, the performance is substantially faster than techniques that involve standing up a MySQL instance:
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.