I recently led a large migration from on-premise Netezza to Azure Databricks for a major North American retailer. As part of this, we needed to:
That last point seems like an afterthought, "oh, I'll just make sure it works, no biggie". But if you're reading this you understand data validation is, by far, the most challenging aspect of a migration between two systems. In this two-part blog series, you'll learn:
Some questions we will explore in this series are:
In this post we'll be focusing on curating robust and thorough test data. How can we get comprehensive data to run our tests with? Stay tuned for part two where we will delve into approaches and automation for creating and running tests.
When your team is migrating script code and pipeline code to Databricks, there are three main steps:
While the first two steps don't necessarily require any data, the third step does. To ensure that a script is converted correctly you typically want to make sure:
To do this, for each script and pipeline being converted, we need:
So, if you are testing a pipeline that has three scripts, and the three scripts all read and then update the same two tables, you will need three copies of each table for pre data (total of six pre tables), and three copies of each table for post data (also six tables). Note that the pre data for one pipeline may be the post data for another. We will use this later on to save space on disk.
The solution we propose to generate the pre and post data is shown below:
While the process seems simple, there are a number of complexities which can crop up:
Because of complexities like this, the solution you come up with will be non-trivial and bespoke for your specific source architecture. For instance, the way you handle this if you're migrating from Netezza SQL scripts triggered by bash scripts (our case) will be fairly different from the solution if you are migrating from SQL Server triggered by stored procedures. Know you are signing up to write code and think through issues!
The first step is to rewrite the pipeline scripts such that they persist out the pre and post data when they get run in the source system. Going forward, I'll assume these are running with some SQL dialogue.
The process here is to:
We need to scan and figure out what tables were touched. This is not trivial. We leveraged the Python package sqlglot
for this, which is able to break down SQL syntax into the underlying AST which the tables are accessible from. When using sqlglot
you need to specify a dialect. Unfortunately for us, at the time of our project Netezza was not one of the 20 dialects supported. In spite of this we were able to get it working by setting the dialect to postgres
, which Netezza was based off of and then write a cleaner which used regex patterns to omit parts of the code which were idiomatic to Netezza. Here is the python code used to parse a single SQL query:
from sqlglot import parse_one, exp
def extract_query_tables(query: str, tables: set[str],
truncates: set[str],
temp_tables: set[str],
debug=False):
# Replace matching lines with an empty string
def is_temp_table(table_ast, depth: int = 0) -> bool:
if depth == 3:
return False
elif table_ast is None:
return False
elif table_ast.sql().startswith("CREATE TEMPORARY"):
return True
else:
return is_temp_table(table_ast.parent, depth + 1)
def fmt_table(table_name: str) -> str:
# remove the database if it exists and make lower case
return re.sub(r"^.*[.]", "", table_name).lower()
# sometimes there is a random ; in there which causes problems
if query != ";":
try:
ast = parse_one(query, read = 'postgres')
except Exception as e:
print("Parse failed for SQL:")
print(query)
raise e
if ast.name.lower() == "truncate":
truncate_table = ast.sql().split(" ")[-1]
truncates.add(fmt_table(truncate_table))
tables.add(fmt_table(truncate_table))
if debug:
print(f"Added truncate table {truncate_table}")
else:
for table in ast.find_all(exp.Table):
if is_temp_table(table):
temp_tables.add(fmt_table(table.name))
elif table.name != "flat_file":
name_scrub = fmt_table(table.name)
tables.add(name_scrub)
def clean_sql_scripts(sql_contents_raw: str) -> str:
# because we are using sqlglot, and sqlgot does not have a pre-built dialect
# for Netezza, we had to make a decision. Either write a dialect parser for sqlglot
# or just get rid of anything in the script that is not generic SQL. This function
# addresses the second option.
sql_contents = sql_contents_raw
replacements = [
# get rid of \time
(r"\\time", ""),
# clean up character varying
(r" varying", ""),
# get rid of generate statistics
(r"generate statistics on \w+ ?;?", ""),
...
(r" byteint\n", " int\n")
]
for pattern, replacement in replacements:
sql_contents = re.sub(pattern, replacement, sql_contents, flags=re.MULTILINE)
return sql_contents
The first function, extract_query_tables
, will take a query and three sets and add items to the sets based on what it finds. It does a few other important things:
TEMPORARY
tables by ignoring them. We donāt need that information in making the pre and post state.database.table
or just table
using the fmt_table
function.;
The second function, clean_sql_scripts
is run before extract_query_tables
and uses regex to scrub Netezza-specific syntax. This was completed using trial-and-error and will likely need to be re-created for future project. Alternatively you could write a dialect parser into sqlglot
.
Parsing modified tables could also be done using the ASTs, but since they are fairly easy to spot, the easier option is to use a simple regex pattern:
r'(?:\binsert\s+into|\bupdate|\bdelete\s+from)\s+([^\s\(\)]+)'
Here is the code we used to do this, it calls the previous two functions from the last section:
import sqlparse
def extract_script_tables(sql: str,
path: str = "../netezza-code/sql/",
debug=False) -> dict[str, set[str]]:
with open(path + sql) as sql_script:
sql_contents_raw = sql_script.read()
sql_contents = clean_sql_scripts(sql_contents_raw)
if debug:
print("Printing modified script SQL:")
print(sql_contents)
queries: list[str] = sqlparse.split(sql_contents)
# find all table names in each query
table_names_all: set[str] = set()
truncate: set[str] = set()
temp_tables: set[str] = set()
for query in queries:
extract_query_tables(query, table_names_all, truncate, temp_tables)
with open(path + sql) as sql_script:
filtered_lines = []
for line in sql_script:
# Skip lines that start with '--' or '/*'
if line.startswith('--') or line.startswith('/*'):
continue
# Append the line to the filtered lines list
filtered_lines.append(line)
filtered_script = ''.join(filtered_lines)
table_pattern = r'(?:\binsert\s+into|\bupdate|\bdelete\s+from)\s+([^\s\(\)]+)'
# Find all matches of table names in the script
table_names_raw: set[str] = set(re.findall(table_pattern, filtered_script, flags=re.IGNORECASE))
table_names_update: set[str] = {item.lower().removeprefix("admin.") for item in table_names_raw}
if debug:
print(f"Regex found updated tables: {table_names_update}")
# remove the temp tables
table_names_all.difference_update(temp_tables)
table_names_update.difference_update(temp_tables)
# add update tables to all tables if not there
table_names_all.update(table_names_update)
return {
'all': table_names_all,
'updated': table_names_update,
'truncate': truncate
}
This function will:
clean_sql_scripts
to get rid of anything that is Netezza-specific syntax.sqlparse.split()
) which can be fed to extract_query_tables
set
s, one for āall tablesā, one for tables that get truncated, and one for temp tables, we need the later two for additional processing.sqlglot
didnāt pick it up and a new set just for updated tables.Now we need to add a section at the top of each SQL script to persist the pre state of the script, and a section at the bottom to persist the post state. Augmenting the scripts could be easy if we had infinite space to work with (or very little data), we would just save out a copy of each table as it is at that time. For example letās say you have this script:
# original script
INSERT INTO table_a
SELECT * FROM table_b
You could change it to look like this (naive solution):
# persist the pre state (naive solution)
CREATE TABLE some_job__some_script__table_a__pre AS
SELECT * FROM table_a
CREATE TABLE some_job__some_script__table_b__pre AS
SELECT * FROM table_b
# original script
INSERT INTO table_a
SELECT * FROM table_b
# persist the post state (only the changed table)
CREATE TABLE some_job__some_script__table_a__post AS
SELECT * FROM table_a
Unfortunately this would mean that if some large fact table was read by 99 scripts, it would need to be persisted 99 times for generating pre test data. Also, how many times would it be the exact duplicate data being saved? This does not scale!
Whatās the solution? Use VIEWs instead which reference previous post tables. This would turn into:
# persist the pre state (optimal solution)
CREATE VIEW some_job__some_script__table_a__pre AS
SELECT * FROM table_a
CREATE VIEW some_job__some_script__table_b__pre AS
SELECT * FROM some_job__previous_script__table_b__post
# original script
INSERT INTO table_a
SELECT * FROM table_b
# persist the post state (only the changed table)
CREATE TABLE some_job__some_script__table_a__post AS
SELECT * FROM table_a
Where:
table_a
is being used for the first time in the run order, and table_b
was previously updated by the script previous_script
in the same job.If we do it this way, then all we have to do to load it into Databricks is batch-load all the tables ending with suffix __post and then materialize all the VIEWs. Weāll go over that later on.
So you might be scratching your head thinking, how do you determine what post table to reference in a given VIEW? The solution is to:
Think about it this way: the first script of the first job will get all its tables from the snapshot taken, not a post table since none exist yet. When the script is updating a table, it will add the table into a hashmap of seen tables with the name of the post table that it persists it out as. Subsequent scripts will get their list of updated tables, then check to see if they are in the seen hashmap. If so, they reference the post table there and then replace it with their own, otherwise they just add their own post table name.
Here is the function that does this:
def process_sequence(job_scripts: list[(str, list[(str, list[str])])],
in_dir: str,
out_dir: str = "converted_sql/"):
# Analyze each script one-at-a-time
# 1. For each script, categorize tables we encounter into three non-exclusive buckets:
# - update tables: tables that are changed during the script
# - all tables: tables that are read or updated during the script
# - truncate tables: tables that should be truncated before saving
# 2. After a script is parsed and the tables are categorized, we check whether the 'all' tables exist in our
# 'seen_tables' dictionary. If so, we grab the value for that in the dict (call it 'v') if not, we add it to
# the dictionary with its current string as populated as 'v'.
# 3. We create VIEWs at the top of the script for all 'all' tables with the qualified name in format
# script__table__pre point to the v table
# 4. We process the update tables. At the bottom of the script we create a new table for each of the tables
# modified during the script's run. The saved name of the table is used to populate the corresponding value
# for the seen tables dictionary such that future usages will call that table.
seen_tables = {}
with open(out_dir + "job_scripts.csv", "w") as script_tracker:
script_tracker.write("job,script,script_num\n")
for job, scripts in job_scripts:
print("Converting for job: " + job)
for i, script_w_nzload_table in enumerate(scripts):
script, nzload_tables = script_w_nzload_table
script_tables: dict[str, set[str]] = extract_script_tables(script, in_dir)
# add in the nzload tables for the script
script_tables["nzload"] = set(nzload_tables)
# update the other sets
if len(nzload_tables) > 0:
# since 'update' and 'all' may be empty
all_set = script_tables.setdefault("all", set())
all_set.update(nzload_tables)
update_set = script_tables.setdefault("updated", set())
update_set.update(nzload_tables)
print(f"Updated {script} to include nzload tables {', '.join(nzload_tables)}")
# get the tables value from seen map, if they don't exist, add it
script_tables['all'] = [(table, seen_tables.setdefault(table, table)) for table in script_tables['all']]
# modify the seen map with any updated tables
[seen_tables.update({table: f"{create_name(job, script, table, i)}__post"})
for table in script_tables['updated']]
# modify the script
convert_script(script, job, script_tables, out_dir, i, in_dir)
# also use create_name to generate a list of job-scripts
orig_name = re.search(r'^(\w+__\w+)__.*', create_name(job, script, "placeholder", i)).group(1)
script_tracker.write(f"{orig_name.replace('__', ',')},{i}\n")
We reference a function, create_name()
which will encode our logic for creating pre and post table names that donāt exceed string character limits in the source system:
def create_name(job: str, script: str, table: str, script_num: int) -> str:
name = f"{job.replace('.', '_')}__{script.replace('.', '_')}__{table}"
if len(name) > 123:
print(f"File name too long for table: {table}, script: {script}, job: {job}. Tried: {name}. Using script num.")
name = f"{job.replace('.', '_')}__{str(script_num)}__{table}"
return name
Now that we know which post tables to reference, we can go ahead and convert the scripts, saving them to a new location
def convert_script(sql: str, job: str, tables: dict[str, set[str]],
out_dir: str, script_num: int, path: str = "../netezza-code/sql/") -> None:
table_names_all = tables["all"]
table_names_update = tables["updated"]
truncate = tables["truncate"]
with open(path + sql) as sql_script:
existing_script = sql_script.read()
with open(out_dir + sql, "w") as sink:
maybe_truncate = "\nWHERE 1=0"
if len(table_names_all) > 0:
sink.write("/* Added by Databricks for test data generation */\n")
for table, table_for_reference in table_names_all:
# if it's a VIEW, we need to materialize it when the script is run to capture deps, so we write it
# as a TABLE. I am intentionally duplicating code to make this more readable.
if table_for_reference.startswith("v"):
sink.write(f"CREATE OR REPLACE TABLE admin.{create_name(job, sql, table, script_num)}__pre AS \
\nSELECT * FROM admin.{table_for_reference}\nDISTRIBUTE ON RANDOM;\n\n")
else:
truncate_status = table in truncate
sink.write(f"CREATE OR REPLACE VIEW admin.{create_name(job, sql, table, script_num)}__pre AS \
\nSELECT * FROM admin.{table_for_reference}{maybe_truncate * truncate_status};\n\n")
sink.write(existing_script)
if len(table_names_update) > 0:
sink.write("\n/* Added by Databricks for test data generation */\n")
for table in table_names_update:
sink.write(f"CREATE TABLE admin.{create_name(job, sql, table, script_num)}__post AS \
\nSELECT * FROM admin.{table}\nDISTRIBUTE ON RANDOM;\n\n")
As we mentioned above in the Considerations section, it may not be safe to run your pipelines as-is because there could be code that touches other production systems. For instance, in our case the Netezza SQL scripts were being triggered by bash scripts which would run the scripts one-at-a-time and fail out if needed. The problem was that these bash scripts also touched a lot of other things such as:
We essentially needed to re-write these bash scripts so that these commands that touched external systems were commented out. How did we scrub the job scripts? We developed a custom parser which would run through the bash script line-for-line and depending on what operation was being completed it would either write out the line to a new file commented out, or as-is. This code required quite a bit of tuning and manual adjustment, but it was worth it to get everything automated. Here is a simple example where we scrub FTP and Oracle-related commands that you can play with and expand upon:
def scrub_job(job: str, out_dir: str = "scrubbed_jobs/") -> list[(str, list[str])]:
with open("../netezza-code/bash-scripts/" + job) as file:
with open(out_dir + job, "w") as sink:
# maintain state
comment_out = False
in_ftp = False
fi_counter = 0
scrubbed_comment = "DATABRICKS SCRUBBED SECTION #############################################\n"
# Loop through the lines in the file
for line in file:
# if we are not in a commented out section
if not comment_out:
# if it's already commented out, just write it and move on
if line.strip().startswith("#"):
sink.write(line)
# comment out Oracle-related commands
elif line.strip().startswith("sqlplus"):
sink.write("# " + scrubbed_comment)
comment_out = True
sink.write("# <Databricks scrubbed>" + line)
# handle FTP
elif line.strip().startswith("ftp -v") or line.strip().startswith("sftp -v"):
sink.write("# " + scrubbed_comment)
sink.write("# " + line)
in_ftp = True
elif in_ftp:
sink.write("# " + line)
if line.strip().startswith("EOF"):
in_ftp = False
sink.write("# End " + scrubbed_comment)
# handle FTP .ftptest
elif line.strip().replace(" ", "").endswith(".ftptest>>$LOG"):
sink.write("# " + scrubbed_comment)
sink.write("# <Databricks scrubbed>" + line)
comment_out = True
else:
sink.write(line)
# if we are in a commented out section
else:
sink.write("# <Databricks scrubbed>" + line)
# the hard part here is making sure we handle if-then blocks correctly
if line.strip().startswith("if"):
fi_counter += 1
if line.strip().startswith("fi"):
fi_counter -= 1
if fi_counter == 0:
comment_out = False
sink.write("# End " + scrubbed_comment)
# ensure we write something in case we end up with an empty if-them-else block
sink.write('echo ""\n')
The code you create to achieve this may look very different, or may not be needed at all!
So now you have an idea of how to approach this, here are some of the edge cases we encountered in our project and how we solved them.
VIEW
s. But what if the table we are reading is actually aVIEW
? Then it doesnāt work because that VIEW
could be referencing tables that were not persisted.VIEW
is being read in the script, actually do aCREATE TABLE
instead of CREATE VIEW
.nzload
nzload
shell command. Typically, changes to tables usually occurred with the nzsql
shell command which called SQL scripts. But in roughly forty occasions tables were changed using nzload
which would update a single table without a SQL script. Would that mess up the whole sequence with āseenā tables mentioned above?nzload
when we scan the job script and then do some complex scripting to add them into the updated_tables
set so that we modify the NEXT script that runs with the nzload
operation. Then- we just add the logic that was previously occurring in the nzload
to the next SQL script. If youāve got this far, youāve finished the hard parts. Loading to Databricks is easy, because of the naming convention we used. For each script we took the following steps.
List out the post state tables for that job from your source system using JDBC, they will be tables that match the pattern <JOB>__<SCRIPT>__.*__post
. Then for each table we:
POST_TABLES
schema. This should be in the same catalog where you want to place your test suites.<JOB>__<SCRIPT>__.*__pre
e"r'^\w+__\w+__\w+__POST$'"
If the view references a normal table we need to grab it from from the database with your historical data snapshot and save it in the schema that is devoted to that job and script, run a query like this:
USE <test_suite_catalog_name>.<job_name>__<script_name>;
CREATE OR REPLACE TABLE <table_name>
USING DELTA
TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported') AS
SELECT * FROM <historical_data_load_catalog_name>.<historical_data_load_database_name>.<table_name>
Our final code to create pre-state tables looked like this:
# MAGIC %sql
# MAGIC USE CATALOG $CATALOG;
# MAGIC DROP SCHEMA IF EXISTS $TARGET_SCHEMA;
# MAGIC CREATE SCHEMA $TARGET_SCHEMA;
# MAGIC USE $TARGET_SCHEMA;
# COMMAND ----------
# get a listing of the script's pre views from Netezza
# check the script name in widget should be complete name
script_views = spark.read.format("jdbc") \
.options(url='jdbc:netezza://<address>:<port>/<backup_env>',
user='<username>',
password='<password>',
query=f"""SELECT VIEWNAME, DEFINITION FROM _v_view
WHERE VIEWNAME LIKE '{JOB.upper()}\_\_{SCRIPT.upper()}\_\_%'
OR VIEWNAME LIKE '{JOB.upper()}\_\_{SCRIPT_NUM}\_\_%'""",
driver='org.netezza.Driver') \
.load() \
.collect()
def create_pre_table(definition: tuple[str, str]) -> None:
table_name = re.search(r'^\w+__\w+__(\w+)__PRE$', definition[0]).group(1)
print(f"Creating table '{table_name}' from Netezza view: {definition[0]}")
reference_table = re.search(r'.* FROM ADMIN.(.*)$', definition[1]).group(1).removesuffix(";")
print(f"\tView references {reference_table}")
if bool(re.match("^\w+__\w+__\w+__POST.*$", reference_table)):
print("\tHandle as a post table")
spark.sql(f"""
CREATE OR REPLACE TABLE {table_name}
SHALLOW CLONE <historical_data_load_catalog_name>.<historical_data_load_database_name>.{table_name}
""")
spark.sql(f"TRUNCATE TABLE {table_name}")
if not reference_table.endswith("WHERE (1 = 0)"):
spark.read.table("post_tables." + reference_table).drop("ROWID").write.mode("append").option("mergeSchema", "true").saveAsTable(table_name)
else:
print("\tHandle as a normal table")
sql_query = f"""
CREATE OR REPLACE TABLE {table_name}
USING DELTA
TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported') AS
{re.sub(r"SELECT .* FROM ADMIN.", f"SELECT * FROM <historical_data_load_catalog_name>.<historical_data_load_database_name>.", definition[1])}
"""
print(sql_query)
spark.sql(sql_query)
[create_pre_table(view_def) for view_def in script_views]
<JOB>__<SCRIPT>__.*__pre
.
Handle it the same way you handled post tables, but put it into the actual schema for the test, not just POST_TABLES
.In Part 1 of this series we explored considerations and a novel approach to curating robust and thorough test data. In the framework laid out and some sample code I hope your team is well prepared to execute this approach. Please remember that the code provided is inspiration to get your team thinking about implementation and edge cases. In reality, your code may look different because of the source system and idiomatic usage therein.
The most important aspect of this is the conceptual approach, which can be a little difficult to understand and communicate to your stakeholders. My advice is to make sure you really understand and can answer these questions:
VIEW
s for the pre state instead of tables?Itās very important to have a detail oriented team member run the scripts in the source system and keep a log of any issues that arise. Additionally, if there are any logs produced by the source system make sure to get access to them as they can be helpful in triaging issues. In addition, I suggest you elect one team member to be your ātest managerā who will develop and maintain the code outlined in this article and the next.
If youāve gotten this far and are thinking to yourself, āwow this is a lot, Iām not sure I want to go this routeā, rest assured. When we started the code was very simple. As we ran into more edge cases the codebase grew. For a small migration you can skip the complexity and simply skip the pre-state VIEW
s. Just duplicate your data and persist TABLE
s! For longer migrations, allocate a few weeks up front to get this going and just convert and run a batch of pipelines every few weeks. That way you have space to triage and fix any issues.
Happy coding!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.