cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Dan_Z
Databricks Employee
Databricks Employee

Introduction to the Series

I recently led a large migration from on-premise Netezza to Azure Databricks for a major North American retailer. As part of this, we needed to:

  • convert all the SQL scripts to Spark
  • convert all the orchestration code 
  • validate that everything was working correctly

That last point seems like an afterthought, "oh, I'll just make sure it works, no biggie". But if you're reading this you understand data validation is, by far, the most challenging aspect of a migration between two systems. In this two-part blog series, you'll learn:

  1. considerations and a novel approach to curating robust and thorough test data, and
  2. approaches and automation for creating and running tests, for elegant and effective test suites.

Some questions we will explore in this series are:

  • How can we obtain test data at both the script and pipeline level for data validation?
  • What are suitable metrics for determining whether data validation has been achieved?
  • How can we run tests and not affect other tests? For instance if they share tables?
  • How do we make one-click test suites that SQL-only team members can use?
  • How do we leverage Databricks' Unity Catalog to make everything even more powerful?

In this post we'll be focusing on curating robust and thorough test data. How can we get comprehensive data to run our tests with? Stay tuned for part two where we will delve into approaches and automation for creating and running tests.

Overview of Part 1: Test Data Curation

When your team is migrating script code and pipeline code to Databricks, there are three main steps:

  1. Look at the original code, understand what it's doing
  2. Convert the code to run on Databricks (convert SQL to Spark SQL or other code to Python/Scala/etc.)
  3. Test the code to make sure it works, fixing as needed

While the first two steps don't necessarily require any data, the third step does. To ensure that a script is converted correctly you typically want to make sure:

  • the code compiles
  • there are no runtime errors
  • any updated tables match what the previous system would have created
  • any output files match what the previous system would have created

To do this, for each script and pipeline being converted, we need:

  • pre data: data for each table in each script that a pipeline needs to run, and
  • post data: data for each table that changed in each script, representing what it should look like after the script or pipeline runs.

So, if you are testing a pipeline that has three scripts, and the three scripts all read and then update the same two tables, you will need three copies of each table for pre data (total of six pre tables), and three copies of each table for post data (also six tables). Note that the pre data for one pipeline may be the post data for another. We will use this later on to save space on disk.

Solution Approach

The solution we propose to generate the pre and post data is shown below:

data_data_gen.png

  1. ā€ƒGet all of the code for scripts and pipelines being run in the source environment as well as a "run order" that can be used to set a sequence in which the scripts will run.
  2. Process each script in the sequence defined in the run order, extract which tables are referenced and which tables are updated. Use a tool like sqlglot (or create your own) to achieve this.
  3. Make a new version of each script, but at the top of the script add a section that writes out the pre state of each table being referenced. At the bottom add a section that persists out the post state of the tables that are changed. 
  4. Run the modified pipelines in the source system in the designated run order. The scripts must be run in a locked-down environment where a snapshot of the production data has been taken and nothing else should be happening in that environment. This will create pre and post data in the source system.
  5. Import that pre and post data into Databricks. Use Unity Catalog to create a new catalog for your test data, then for each script make a new schema and load it with the tables needed to run the script. Put all the post table data in its own schema.
  6. When you run the test suite for a script, run it against the created schema for that script and validate by comparing the changed tables against the correct post table versions.

Required

  • Access to all the original scripts from the source system.
  • Access to all the job scripts from the source system.
  • An agreed-upon run order where every script and pipeline is run one-at-a-time.
  • A historical snapshot of production-quality data taken from the production source environment.
  • An isolated source environment where scripts can be run and no other changes to the tables are being made, i.e. no other jobs being run. This should be loaded with the snapshot above.
  • A backup isolated source environment with the same snapshot loaded, but used as a backup in case anything goes wrong. Should be completely static.
  • A source environment expert willing and able to run all the jobs in the source environment and triage any issues.

Considerations

While the process seems simple, there are a number of complexities which can crop up:

  • We might need to scrub the pipelines to be able to run in their historical snapshot environment, i.e. if the pipeline:
    • sends e-mails
    • places files in production FTP
    • touches other production systems/resources
  • It may not be feasible to save out copies of each table, because we would run out of disk space, so we can save out VIEWs instead for the pre data, where either the original data from the historical snapshot or a previous post table in the run order is used.
  • We need a naming convention to ensure every table can be tracked down to its jobs, script, table name, and whether it is pre or post. Consider [job_name]__[script_name]__[table_name]__[pre or post].
    • e.g. job_a__script_1__table_x__pre, or job_b__script_2__table_y__post
    • What if this name becomes longer than the limit?
  • Some scripts may be used more than once in the same pipeline, so we need to handle separate versions in our naming

Because of complexities like this, the solution you come up with will be non-trivial and bespoke for your specific source architecture. For instance, the way you handle this if you're migrating from Netezza SQL scripts triggered by bash scripts (our case) will be fairly different from the solution if you are migrating from SQL Server triggered by stored procedures. Know you are signing up to write code and think through issues!

Script Modification

The first step is to rewrite the pipeline scripts such that they persist out the pre and post data when they get run in the source system. Going forward, I'll assume these are running with some SQL dialogue.

The process here is to:

  1. Determine what tables the script touches
  2. Determine what tables the script modifies
  3. Add a section at the top of the script which will persist out the pre state of the script, for all tables touched
  4. Add a section at the bottom which will persist out the post state of the script, for tables that were modified

Parsing all touched tables

We need to scan and figure out what tables were touched. This is not trivial. We leveraged the Python package sqlglot for this, which is able to break down SQL syntax into the underlying AST which the tables are accessible from. When using sqlglot you need to specify a dialect. Unfortunately for us, at the time of our project Netezza was not one of the 20 dialects supported. In spite of this we were able to get it working by setting the dialect to postgres, which Netezza was based off of and then write a cleaner which used regex patterns to omit parts of the code which were idiomatic to Netezza. Here is the python code used to parse a single SQL query:

 

 

from sqlglot import parse_one, exp
def extract_query_tables(query: str, tables: set[str], 
                         truncates: set[str], 
                         temp_tables: set[str], 
                         debug=False):
    # Replace matching lines with an empty string
    def is_temp_table(table_ast, depth: int = 0) -> bool:
        if depth == 3:
            return False
        elif table_ast is None:
            return False
        elif table_ast.sql().startswith("CREATE TEMPORARY"):
            return True
        else:
            return is_temp_table(table_ast.parent, depth + 1)
    def fmt_table(table_name: str) -> str:
        # remove the database if it exists and make lower case
        return re.sub(r"^.*[.]", "", table_name).lower()
    # sometimes there is a random ; in there which causes problems
    if query != ";":
        try:
            ast = parse_one(query, read = 'postgres')
        except Exception as e:
            print("Parse failed for SQL:")
            print(query)
            raise e
        if ast.name.lower() == "truncate":
            truncate_table = ast.sql().split(" ")[-1]
            truncates.add(fmt_table(truncate_table))
            tables.add(fmt_table(truncate_table))
            if debug:
                print(f"Added truncate table {truncate_table}")
        else:
            for table in ast.find_all(exp.Table):
                if is_temp_table(table):
                    temp_tables.add(fmt_table(table.name))
                elif table.name != "flat_file":
                    name_scrub = fmt_table(table.name)
                    tables.add(name_scrub)
def clean_sql_scripts(sql_contents_raw: str) -> str:
    # because we are using sqlglot, and sqlgot does not have a pre-built dialect 
    # for Netezza, we had to make a decision. Either write a dialect parser for sqlglot 
    # or just get rid of anything in the script that is not generic SQL. This function 
    # addresses the second option.
    sql_contents = sql_contents_raw
    replacements = [
        # get rid of \time
        (r"\\time", ""),
        # clean up character varying
        (r" varying", ""),
        # get rid of generate statistics
        (r"generate statistics on \w+ ?;?", ""),
        ...
        (r" byteint\n", " int\n")
    ]
    for pattern, replacement in replacements:
        sql_contents = re.sub(pattern, replacement, sql_contents, flags=re.MULTILINE)
    return sql_contents

 

 

The first function, extract_query_tables, will take a query and three sets and add items to the sets based on what it finds. It does a few other important things:

  • Handles TEMPORARY tables by ignoring them. We donā€™t need that information in making the pre and post state.
  • Handles tables whether they are provided in format database.table or just table using the fmt_table function.
  • Handles queries that are just ;

The second function, clean_sql_scripts is run before extract_query_tables and uses regex to scrub Netezza-specific syntax. This was completed using trial-and-error and will likely need to be re-created for future project. Alternatively you could write a dialect parser into sqlglot.

Parsing modified tables

Parsing modified tables could also be done using the ASTs, but since they are fairly easy to spot, the easier option is to use a simple regex pattern:

r'(?:\binsert\s+into|\bupdate|\bdelete\s+from)\s+([^\s\(\)]+)'

Here is the code we used to do this, it calls the previous two functions from the last section:

 

 

import sqlparse

def extract_script_tables(sql: str, 
                          path: str = "../netezza-code/sql/", 
                          debug=False) -> dict[str, set[str]]:

    with open(path + sql) as sql_script:
        sql_contents_raw = sql_script.read()

    sql_contents = clean_sql_scripts(sql_contents_raw)
    if debug:
        print("Printing modified script SQL:")
        print(sql_contents)
    queries: list[str] = sqlparse.split(sql_contents)

    # find all table names in each query
    table_names_all: set[str] = set()
    truncate: set[str] = set()
    temp_tables: set[str] = set()

    for query in queries:
        extract_query_tables(query, table_names_all, truncate, temp_tables)

    with open(path + sql) as sql_script:
        filtered_lines = []
        for line in sql_script:
            # Skip lines that start with '--' or '/*'
            if line.startswith('--') or line.startswith('/*'):
                continue
            # Append the line to the filtered lines list
            filtered_lines.append(line)
        filtered_script = ''.join(filtered_lines)
        table_pattern = r'(?:\binsert\s+into|\bupdate|\bdelete\s+from)\s+([^\s\(\)]+)'

        # Find all matches of table names in the script
        table_names_raw: set[str] = set(re.findall(table_pattern, filtered_script, flags=re.IGNORECASE))
        table_names_update: set[str] = {item.lower().removeprefix("admin.") for item in table_names_raw}
        if debug:
            print(f"Regex found updated tables: {table_names_update}")

    # remove the temp tables
    table_names_all.difference_update(temp_tables)
    table_names_update.difference_update(temp_tables)

    # add update tables to all tables if not there
    table_names_all.update(table_names_update)

    return {
        'all': table_names_all,
        'updated': table_names_update,
        'truncate': truncate
    }

 

 

This function will:

  1. Read the script from disk and clean it by calling clean_sql_scripts to get rid of anything that is Netezza-specific syntax.
  2. Split the scriptā€™s text into separate SQL queries (sqlparse.split()) which can be fed to extract_query_tables
  3. Initiate three sets, one for ā€œall tablesā€, one for tables that get truncated, and one for temp tables, we need the later two for additional processing.
  4. Parse the queries, updating the common sets
  5. Now parse the scripts again using regex to get the modified tables, add these to the previous ā€œall tablesā€ set in case sqlglot didnā€™t pick it up and a new set just for updated tables.
  6. Remove any temp tables from our ā€œall tablesā€ and updated tables sets.

Augment Scripts

Now we need to add a section at the top of each SQL script to persist the pre state of the script, and a section at the bottom to persist the post state. Augmenting the scripts could be easy if we had infinite space to work with (or very little data), we would just save out a copy of each table as it is at that time. For example letā€™s say you have this script:

 

# original script
INSERT INTO table_a
SELECT * FROM table_b

 

You could change it to look like this (naive solution):

 

# persist the pre state (naive solution)
CREATE TABLE some_job__some_script__table_a__pre AS
SELECT * FROM table_a

CREATE TABLE some_job__some_script__table_b__pre AS
SELECT * FROM table_b

# original script
INSERT INTO table_a
SELECT * FROM table_b

# persist the post state (only the changed table) 
CREATE TABLE some_job__some_script__table_a__post AS
SELECT * FROM table_a

 

Unfortunately this would mean that if some large fact table was read by 99 scripts, it would need to be persisted 99 times for generating pre test data. Also, how many times would it be the exact duplicate data being saved? This does not scale!

Whatā€™s the solution? Use VIEWs instead which reference previous post tables. This would turn into:

 

# persist the pre state (optimal solution)
CREATE VIEW some_job__some_script__table_a__pre AS
SELECT * FROM table_a

CREATE VIEW some_job__some_script__table_b__pre AS
SELECT * FROM some_job__previous_script__table_b__post

# original script
INSERT INTO table_a
SELECT * FROM table_b

# persist the post state (only the changed table) 
CREATE TABLE some_job__some_script__table_a__post AS
SELECT * FROM table_a

 

Where:

  • table_a is being used for the first time in the run order, and 
  • table_b was previously updated by the script previous_script in the same job.

If we do it this way, then all we have to do to load it into Databricks is batch-load all the tables ending with suffix __post and then materialize all the VIEWs. Weā€™ll go over that later on.

Maintaining a hashmap of seen tables

So you might be scratching your head thinking, how do you determine what post table to reference in a given VIEW? The solution is to:

  • use your set job (and script) run order, and then
  • parse scripts in the order of what they will be run in the snapshot environment

Think about it this way: the first script of the first job will get all its tables from the snapshot taken, not a post table since none exist yet. When the script is updating a table, it will add the table into a hashmap of seen tables with the name of the post table that it persists it out as. Subsequent scripts will get their list of updated tables, then check to see if they are in the seen hashmap. If so, they reference the post table there and then replace it with their own, otherwise they just add their own post table name.

Here is the function that does this:

 

def process_sequence(job_scripts: list[(str, list[(str, list[str])])],
                     in_dir: str,
                     out_dir: str = "converted_sql/"):
    # Analyze each script one-at-a-time
    # 1. For each script, categorize tables we encounter into three non-exclusive buckets:
    #    - update tables: tables that are changed during the script
    #    - all tables: tables that are read or updated during the script
    #    - truncate tables: tables that should be truncated before saving
    # 2. After a script is parsed and the tables are categorized, we check whether the 'all' tables exist in our
    #    'seen_tables' dictionary. If so, we grab the value for that in the dict (call it 'v') if not, we add it to
    #    the dictionary with its current string as populated as 'v'.
    # 3. We create VIEWs at the top of the script for all 'all' tables with the qualified name in format
    #    script__table__pre point to the v table
    # 4. We process the update tables. At the bottom of the script we create a new table for each of the tables
    #    modified during the script's run. The saved name of the table is used to populate the corresponding value
    #    for the seen tables dictionary such that future usages will call that table.
    
    seen_tables = {}

    with open(out_dir + "job_scripts.csv", "w") as script_tracker:
        script_tracker.write("job,script,script_num\n")
        for job, scripts in job_scripts:
            print("Converting for job: " + job)
            for i, script_w_nzload_table in enumerate(scripts):
                script, nzload_tables = script_w_nzload_table
                script_tables: dict[str, set[str]] = extract_script_tables(script, in_dir)

                # add in the nzload tables for the script
                script_tables["nzload"] = set(nzload_tables)

                # update the other sets
                if len(nzload_tables) > 0:
                    # since 'update' and 'all' may be empty
                    all_set = script_tables.setdefault("all", set())
                    all_set.update(nzload_tables)

                    update_set = script_tables.setdefault("updated", set())
                    update_set.update(nzload_tables)

                    print(f"Updated {script} to include nzload tables {', '.join(nzload_tables)}")

                # get the tables value from seen map, if they don't exist, add it
                script_tables['all'] = [(table, seen_tables.setdefault(table, table)) for table in script_tables['all']]

                # modify the seen map with any updated tables
                [seen_tables.update({table: f"{create_name(job, script, table, i)}__post"})
                 for table in script_tables['updated']]

                # modify the script
                convert_script(script, job, script_tables, out_dir, i, in_dir)

                # also use create_name to generate a list of job-scripts
                orig_name = re.search(r'^(\w+__\w+)__.*', create_name(job, script, "placeholder", i)).group(1)
                script_tracker.write(f"{orig_name.replace('__', ',')},{i}\n")

 

We reference a function, create_name() which will encode our logic for creating pre and post table names that donā€™t exceed string character limits in the source system:

 

def create_name(job: str, script: str, table: str, script_num: int) -> str:
    name = f"{job.replace('.', '_')}__{script.replace('.', '_')}__{table}"
    if len(name) > 123:
        print(f"File name too long for table: {table}, script: {script}, job: {job}. Tried: {name}. Using script num.")
        name = f"{job.replace('.', '_')}__{str(script_num)}__{table}"
    return name

 

Code to convert scripts

Now that we know which post tables to reference, we can go ahead and convert the scripts, saving them to a new location

 

def convert_script(sql: str, job: str, tables: dict[str, set[str]],
                   out_dir: str, script_num: int, path: str = "../netezza-code/sql/") -> None:
    table_names_all = tables["all"]
    table_names_update = tables["updated"]
    truncate = tables["truncate"]

    with open(path + sql) as sql_script:
        existing_script = sql_script.read()

    with open(out_dir + sql, "w") as sink:
        maybe_truncate = "\nWHERE 1=0"
        if len(table_names_all) > 0:
            sink.write("/* Added by Databricks for test data generation */\n")
            for table, table_for_reference in table_names_all:

                # if it's a VIEW, we need to materialize it when the script is run to capture deps, so we write it
                # as a TABLE. I am intentionally duplicating code to make this more readable.
                if table_for_reference.startswith("v"):
                    sink.write(f"CREATE OR REPLACE TABLE admin.{create_name(job, sql, table, script_num)}__pre AS \
                            \nSELECT * FROM admin.{table_for_reference}\nDISTRIBUTE ON RANDOM;\n\n")
                else:
                    truncate_status = table in truncate
                    sink.write(f"CREATE OR REPLACE VIEW admin.{create_name(job, sql, table, script_num)}__pre AS \
                            \nSELECT * FROM admin.{table_for_reference}{maybe_truncate * truncate_status};\n\n")

        sink.write(existing_script)

        if len(table_names_update) > 0:
            sink.write("\n/* Added by Databricks for test data generation */\n")
            for table in table_names_update:
                sink.write(f"CREATE TABLE admin.{create_name(job, sql, table, script_num)}__post AS \
                \nSELECT * FROM admin.{table}\nDISTRIBUTE ON RANDOM;\n\n")

 

Job script modification

As we mentioned above in the Considerations section, it may not be safe to run your pipelines as-is because there could be code that touches other production systems. For instance, in our case the Netezza SQL scripts were being triggered by bash scripts which would run the scripts one-at-a-time and fail out if needed. The problem was that these bash scripts also touched a lot of other things such as:

  • Oracle servers
  • SFTP/FTP servers
  • Email daemons

We essentially needed to re-write these bash scripts so that these commands that touched external systems were commented out. How did we scrub the job scripts? We developed a custom parser which would run through the bash script line-for-line and depending on what operation was being completed it would either write out the line to a new file commented out, or as-is. This code required quite a bit of tuning and manual adjustment, but it was worth it to get everything automated. Here is a simple example where we scrub FTP and Oracle-related commands that you can play with and expand upon:

 

def scrub_job(job: str, out_dir: str = "scrubbed_jobs/") -> list[(str, list[str])]:
    with open("../netezza-code/bash-scripts/" + job) as file:
        with open(out_dir + job, "w") as sink:

            # maintain state
            comment_out = False
            in_ftp = False            
     fi_counter = 0
            scrubbed_comment = "DATABRICKS SCRUBBED SECTION #############################################\n"

            # Loop through the lines in the file
            for line in file:

		   # if we are not in a commented out section 
                if not comment_out:

			# if it's already commented out, just write it and move on
                    if line.strip().startswith("#"):
                        sink.write(line)

                    # comment out Oracle-related commands 
                    elif line.strip().startswith("sqlplus"):
                        sink.write("# " + scrubbed_comment)
                        comment_out = True
                        sink.write("# <Databricks scrubbed>" + line)

                    # handle FTP
                    elif line.strip().startswith("ftp -v") or line.strip().startswith("sftp -v"):
                        sink.write("# " + scrubbed_comment)
                        sink.write("# " + line)
                        in_ftp = True

                    elif in_ftp:
                        sink.write("# " + line)
                        if line.strip().startswith("EOF"):
                            in_ftp = False
                            sink.write("# End " + scrubbed_comment)

                    # handle FTP .ftptest
                    elif line.strip().replace(" ", "").endswith(".ftptest>>$LOG"):
                        sink.write("# " + scrubbed_comment)
                        sink.write("# <Databricks scrubbed>" + line)
                        comment_out = True

			else:
                        sink.write(line)
                    
		   # if we are in a commented out section
                else:
                    sink.write("# <Databricks scrubbed>" + line)
		       # the hard part here is making sure we handle if-then blocks correctly
                    if line.strip().startswith("if"):
                        fi_counter += 1
                    if line.strip().startswith("fi"):
                        fi_counter -= 1
                        if fi_counter == 0:
                            comment_out = False
                            sink.write("# End " + scrubbed_comment)
                            # ensure we write something in case we end up with an empty if-them-else block
                            sink.write('echo ""\n')

 

The code you create to achieve this may look very different, or may not be needed at all!

Edge cases handled

So now you have an idea of how to approach this, here are some of the edge cases we encountered in our project and how we solved them.

Re-use of scripts

  • Problem: if a script was re-used then when we rewrote it with the code to persist the pre and post state, we would overwrite the file. Whenever that script ran it would only persist the last one.
  • Solution: keep track of SQL scripts that have been run. If one gets run again, rename it and also rename it in the job script.

Dealing with VIEWs

  • Problem: When we are writing the pre state, we are writing VIEWs. But what if the table we are reading is actually aVIEW? Then it doesnā€™t work because that VIEW could be referencing tables that were not persisted.
  • Solution: If we see aVIEW is being read in the script, actually do aCREATE TABLE instead of CREATE VIEW.

Usage of nzload

  • Problem: A big problem in our project was the use of the nzload shell command. Typically, changes to tables usually occurred with the nzsql shell command which called SQL scripts. But in roughly forty occasions tables were changed using nzload which would update a single table without a SQL script. Would that mess up the whole sequence with ā€œseenā€ tables mentioned above?
  • Solution: Get the tables modified by nzload when we scan the job script and then do some complex scripting to add them into the updated_tables set so that we modify the NEXT script that runs with the nzload operation. Then- we just add the logic that was previously occurring in the nzload to the next SQL script. 

Loading to Databricks

If youā€™ve got this far, youā€™ve finished the hard parts. Loading to Databricks is easy, because of the naming convention we used. For each script we took the following steps.

Load the post data for the script into the catalog

List out the post state tables for that job from your source system using JDBC, they will be tables that match the pattern <JOB>__<SCRIPT>__.*__post.  Then for each table we:

  1. Extract the correct table schema for that table from the database with your historical data snapshot
  2. Using that schema, read the post table from JDBC
  3. Save it to a special schema for your post tables, the POST_TABLES schema. This should be in the same catalog where you want to place your test suites.

Load the pre data into the test suite schema

  1. All the pre-state tables are views, so we search the Netezza database for views that match the pattern <JOB>__<SCRIPT>__.*__pre
  2. We get two types of views:
    • Views that reference a normal table (comes from historical data load) 
    • Views that reference a post-state table
  3. Determine which is which by seeing if the table matches the regex pattern of e"r'^\w+__\w+__\w+__POST$'"

    If the view references a normal table we need to grab it from from the database with your historical data snapshot and save it in the schema that is devoted to that job and script, run a query like this:

    USE <test_suite_catalog_name>.<job_name>__<script_name>;
    
    CREATE OR REPLACE TABLE <table_name>
    USING DELTA
    TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported') AS
    SELECT * FROM <historical_data_load_catalog_name>.<historical_data_load_database_name>.<table_name>
  4. If the view references a post state table, we need to grab it from our current catalog in the POST_TABLES schema and load it in, being careful to match the schema.

Our final code to create pre-state tables looked like this:

 

# MAGIC %sql
# MAGIC USE CATALOG $CATALOG;
# MAGIC DROP SCHEMA IF EXISTS $TARGET_SCHEMA;
# MAGIC CREATE SCHEMA $TARGET_SCHEMA;
# MAGIC USE $TARGET_SCHEMA;

# COMMAND ----------

# get a listing of the script's pre views from Netezza
# check the script name in widget should be complete name
script_views = spark.read.format("jdbc") \
  .options(url='jdbc:netezza://<address>:<port>/<backup_env>', 
    user='<username>',
    password='<password>',
    query=f"""SELECT VIEWNAME, DEFINITION FROM _v_view  
          WHERE VIEWNAME LIKE '{JOB.upper()}\_\_{SCRIPT.upper()}\_\_%'
          OR VIEWNAME LIKE '{JOB.upper()}\_\_{SCRIPT_NUM}\_\_%'""", 
    driver='org.netezza.Driver') \
  .load() \
  .collect()

def create_pre_table(definition: tuple[str, str]) -> None:
  table_name = re.search(r'^\w+__\w+__(\w+)__PRE$', definition[0]).group(1)

  print(f"Creating table '{table_name}' from Netezza view: {definition[0]}")
  reference_table = re.search(r'.* FROM ADMIN.(.*)$', definition[1]).group(1).removesuffix(";")

  print(f"\tView references {reference_table}")
  if bool(re.match("^\w+__\w+__\w+__POST.*$", reference_table)):
    print("\tHandle as a post table")

    spark.sql(f"""
      CREATE OR REPLACE TABLE {table_name}
      SHALLOW CLONE <historical_data_load_catalog_name>.<historical_data_load_database_name>.{table_name}
      """)

    spark.sql(f"TRUNCATE TABLE {table_name}")

    if not reference_table.endswith("WHERE (1 = 0)"):
      spark.read.table("post_tables." + reference_table).drop("ROWID").write.mode("append").option("mergeSchema", "true").saveAsTable(table_name)

  else:
    print("\tHandle as a normal table")
    sql_query = f"""
      CREATE OR REPLACE TABLE {table_name}
      USING DELTA
      TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported') AS
      {re.sub(r"SELECT .* FROM ADMIN.", f"SELECT * FROM <historical_data_load_catalog_name>.<historical_data_load_database_name>.", definition[1])}
      """
    print(sql_query)
    spark.sql(sql_query)

[create_pre_table(view_def) for view_def in script_views]

 

Load the views for the script into the catalog

  1. List out the views for that job from your source system using JDBC, they will be tables that match the pattern <JOB>__<SCRIPT>__.*__pre.
  2. Handle it the same way you handled post tables, but put it into the actual schema for the test, not just POST_TABLES.

Parting thoughts

In Part 1 of this series we explored considerations and a novel approach to curating robust and thorough test data. In the framework laid out and some sample code I hope your team is well prepared to execute this approach. Please remember that the code provided is inspiration to get your team thinking about implementation and edge cases. In reality, your code may look different because of the source system and idiomatic usage therein. 

The most important aspect of this is the conceptual approach, which can be a little difficult to understand and communicate to your stakeholders. My advice is to make sure you really understand and can answer these questions:

  • Whatā€™s the importance of the run order?
  • What happens if the run order used when modifying scripts is different from the run order used when running the scripts in the source system?
  • How are we going to recover if something runs out of order?
  • Why do we persist our VIEWs for the pre state instead of tables?

Itā€™s very important to have a detail oriented team member run the scripts in the source system and keep a log of any issues that arise. Additionally, if there are any logs produced by the source system make sure to get access to them as they can be helpful in triaging issues. In addition, I suggest you elect one team member to be your ā€œtest managerā€ who will develop and maintain the code outlined in this article and the next. 

If youā€™ve gotten this far and are thinking to yourself, ā€œwow this is a lot, Iā€™m not sure I want to go this routeā€, rest assured. When we started the code was very simple. As we ran into more edge cases the codebase grew. For a small migration you can skip the complexity and simply skip the pre-state VIEWs. Just duplicate your data and persist TABLEs! For longer migrations, allocate a few weeks up front to get this going and just convert and run a batch of pipelines every few weeks. That way you have space to triage and fix any issues.

Happy coding!