cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Optimising the creation of a change log for transactional sources in an ETL pipeline

scvbelle
New Contributor III

I have multiple transactional sources feeding into my Azure Databricks env (MySQL, MSSQL, MySQL datadumps) for a client company-wide DataLake. They are basically all "managed sources" (using different DBMSs, receiving application dumps, etc), but I don't trust that they're protected against schema drift and I want to ingest/store all raw data (because of my lack of faith in the sources, some of which are dependant on external third parties) in a batch ETL process, but process only the relevant data. Ideally, I don't want to store a copy of each source table for each ingestion. 

Because of the diversity of the data, I don't want to capture change for them all in the same way, so my solution is to keep a log of differences between the current state of the source table and a mirror of the previous ingest. I can then work on choosing how to capture the change for silver/gold on a case by case basis, by propagating changes from the logs into the clean tables.

I've got the current DLT solution below, but I suspect it's very inefficient and that there's a better way of doing things to leverage spark/Databricks functionality. I am also aware that I can easily run into an OOM or pagination issue because I'm loading whole tables, not streaming/batchign, and I'm not entirely sure how the DLT table will handle a change in the number of columns.

Cost is a signficant factor (client works mainly on donor funding). I've got a Unity Catalog-enabled ws, but am using core DLT.  i'm also concerned about the persistance of the DLT log tables. I'm not even sure if this is a reasonable use case for DLT. Data quantities are actually pretty small (each source on the order of GBs), but there are one or two monster tables.

Any suggestions for alternative more efficient solutions are welcome!
These are some of the solutions/threads I've been looking at:

Data change feed: https://docs.databricks.com/delta/delta-change-data-feed.html
https://community.databricks.com/t5/data-engineering/what-are-the-best-practices-for-change-data-cap...
https://community.databricks.com/t5/data-engineering/when-should-change-data-feed-be-used/td-p/26000

External management: https://community.databricks.com/t5/data-engineering/how-do-you-capture-change-logs-from-rdms-source...
https://www.databricks.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dm...
https://www.databricks.com/blog/2018/10/29/simplifying-change-data-capture-with-databricks-delta.htm...

DLT CDC: https://docs.gcp.databricks.com/delta-live-tables/cdc.html

similar discussions re ETL from transactional sources: 
https://community.databricks.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-...
https://stackoverflow.com/questions/59591536/how-to-compare-two-versions-of-delta-table-to-get-chang...

This is the actual function

 

 

 

from typing import List, Optional, Dict, Tuple, Set
from dataclasses import dataclass
from pandas import DataFrame as PandasDf
import dlt
from pyspark.sql import DataFrame as SparkDf
from pyspark.sql.column import Column
from pyspark.sql.functions import current_timestamp, lit

@dataclass
class Source:
    name: str
    server_url: str
    user: str
    password: str
    driver: str = "org.mariadb.jdbc.Driver"

def generate_diff_table(
    query: str, 
    table_name: str,
    mirror_table_location: str,
    diff_log_location: str,
    )-> None:
    
  @dlt.table(
    name= f"{table_name}_diff_log",
    comment=f"Log of changes to query: {query}",
    path=diff_log_location
  )
  def create_diff_table():
    timestamp: Column = current_timestamp()
    print(query)
    source_df: SparkDf = spark.read.format('jdbc').options(
            driver=source.driver,
            url=f"{source.server_url}",
            query=query,
            user=source.user,
            password=source.password,
            useSSL=True,
            trustServerCertificate=True,
            ).load()
    mirror_path: str = f"{mirror_table_location}.{table_name}_mirror"

# if the table has been ingested before, do set subtraction to get the differences between the current version and the previous version and store the differences as a new table (diff_df)
    if spark.catalog.tableExists(mirror_path):
        prev_mirror_df: SparkDf = spark.sql(f"Select * from {mirror_path}")
        diff_insert: SparkDf = source_df.subtract(prev_mirror_df).withColumn(colName="action", col=lit("insert")).withColumn(colName="ingest_timestamp", col=lit(timestamp))
        diff_del: SparkDf = prev_mirror_df.subtract(source_df).withColumn(colName="action", col=lit("delete")).withColumn(colName="ingest_timestamp", col=lit(timestamp))
        diff_df: SparkDf = diff_insert.union(diff_del)
    else:  # if the tbale has not been ingested before, all rows are essentially inserted
        diff_df = source_df.withColumn(colName="action", col=lit("insert")).withColumn(colName="ingest_timestamp", col=lit(timestamp))
    
    source_df.write.mode("overwrite").saveAsTable(mirror_path)
    return diff_df

 

-------------------------------------------------------

1 ACCEPTED SOLUTION

Accepted Solutions

scvbelle
New Contributor III

I made the "mistake" of talking to my software engineer partner who majored in pure math about the problem.

They agreed that 
1) bringing both tables into memory is not a good idea, especially as then the diff tables will then also be held in memory
2) doing the set subtraction operations is really, really inefficient, especially as it looks like the pyspark subtract is based off SQL Except Distinct which does not use indexing, so for each row in the base table it's iterating thorugh the entire other table checking for a duplicate, and this is then done twice. Not sustainable, not scalable.

They're solution was to process the tables row by row and check for equivalence.

  1. sort the source and target tables by their primary key, (there's already an issue in that i can't sort the source tables, so i have to copy the source to a local table)
  2. iterate row by row across both tables simultaneously. If there is a mismatch, take the row that has the lower index - this is the row that is missing in a table - if it is from the source table, then it is an added record, if it is from the target table (reflection of the previous state), then it is a deleted row.
  3. add this row to a seperate log table with the action "insert"/"delete"), and add/remove the row from the target table.
  4. Adjust the iterator so that the equivalent rows are processed, and continue to the next row.

This solution iterates once through the table, with two sorts and a copy, and runs a query per row, essentially.

My concerns with this approach are mainly just that i'd be building my own iterator, rather than leveraging platform streaming capabilities, and that I'm not sure I can guarantee that my tables all have primary keys to sort by.

I'm leaving out a lot about autoincrement issues and the relevance of ordering, and what happens with schema drift etc.

A second approach would be to use Structured Streaming perroiw functionality and do a lookup per row of the source table into the target table, and if it exists, add the current date to a "last checked" column or something, such that any rows that are deleted would have an outdated "last checked" value. any rows that don't exist can be added with the current date to a "date added" column. This means doing a lookup across the target table per row of the source table. Potentially indexing the target table could help here, I don't know. It also means the source and target tables are different shapes and i'd need to exclude my metadata columns for the row-wise comparison.

Any insights into the relative efficiencies of any of these functions are still welcome. I think I'll try my partner's approach first, see how it compares to the original approach. Maybe if I have just crazy excess time I'll try out my idea and see what the impact is. If I have any interesting results to share I'll add it here.

I feel like I'm doing a lto of wheel-reinventing, though, so if anyone has a "you're thiking about this completely wrong" comment, I'm all ears.

View solution in original post

2 REPLIES 2

scvbelle
New Contributor III

I made the "mistake" of talking to my software engineer partner who majored in pure math about the problem.

They agreed that 
1) bringing both tables into memory is not a good idea, especially as then the diff tables will then also be held in memory
2) doing the set subtraction operations is really, really inefficient, especially as it looks like the pyspark subtract is based off SQL Except Distinct which does not use indexing, so for each row in the base table it's iterating thorugh the entire other table checking for a duplicate, and this is then done twice. Not sustainable, not scalable.

They're solution was to process the tables row by row and check for equivalence.

  1. sort the source and target tables by their primary key, (there's already an issue in that i can't sort the source tables, so i have to copy the source to a local table)
  2. iterate row by row across both tables simultaneously. If there is a mismatch, take the row that has the lower index - this is the row that is missing in a table - if it is from the source table, then it is an added record, if it is from the target table (reflection of the previous state), then it is a deleted row.
  3. add this row to a seperate log table with the action "insert"/"delete"), and add/remove the row from the target table.
  4. Adjust the iterator so that the equivalent rows are processed, and continue to the next row.

This solution iterates once through the table, with two sorts and a copy, and runs a query per row, essentially.

My concerns with this approach are mainly just that i'd be building my own iterator, rather than leveraging platform streaming capabilities, and that I'm not sure I can guarantee that my tables all have primary keys to sort by.

I'm leaving out a lot about autoincrement issues and the relevance of ordering, and what happens with schema drift etc.

A second approach would be to use Structured Streaming perroiw functionality and do a lookup per row of the source table into the target table, and if it exists, add the current date to a "last checked" column or something, such that any rows that are deleted would have an outdated "last checked" value. any rows that don't exist can be added with the current date to a "date added" column. This means doing a lookup across the target table per row of the source table. Potentially indexing the target table could help here, I don't know. It also means the source and target tables are different shapes and i'd need to exclude my metadata columns for the row-wise comparison.

Any insights into the relative efficiencies of any of these functions are still welcome. I think I'll try my partner's approach first, see how it compares to the original approach. Maybe if I have just crazy excess time I'll try out my idea and see what the impact is. If I have any interesting results to share I'll add it here.

I feel like I'm doing a lto of wheel-reinventing, though, so if anyone has a "you're thiking about this completely wrong" comment, I'm all ears.

scvbelle
New Contributor III

This is the implementaiton of the original pos, for interest (excluded from the original post to reduce length)

 

# some blacklisting and schema renamings where necessary (due to conflicts and/or illegal characters)
SourceName=str
SchemaName=str
TableName=str
ColName=str
SourceSchemaName = str
TargetSchemaName = str

excl_schemas: Dict[SourceName, List[SchemaName]] = {
    "my_first_source": [
        "information_schema",
        "sys",
        "performance_schema",
        "mysql",
        "redcap",
    ],
    "my_snd_source": [...
    ]
}

excl_tables: Dict[SourceName, Dict[SchemaName, Set[TableName]]] = {
    'my_first_source': {},
    'my_snd_source': {},
    'data_dump': {
        "datadump": {"table_logstore_huge_log_file"}
    }
}

#todo: write to table as record of what has been excluded

rename_schemas: Dict[SourceName, Dict[SourceSchemaName, TargetSchemaName]] = {
    "my_first_source": {
            "my-schema": "my_schema",
            "main": "mys_source_main",
           },
    }

 

 

 

# create a log  and update the mirror of the source schema info
def python_to_sql_list(py_list: List[str]) -> str:
    """converts a python list into a sql-parsable list """
    return "('" + "', '".join(py_list) + "')"

schemas_to_exclude: List[str] = excl_schemas[current_source]
excl_schema_str: str = python_to_sql_list(schemas_to_exclude)

info_schema_query: str = f"select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, COLUMN_KEY from information_schema.columns WHERE TABLE_SCHEMA not in {excl_schema_str}"

generate_diff_table(query=info_schema_query,
                table_namem=f"info_sch_{current_source}",
                mirror_table_location=f"pipeline_utils.info_schema_change_logs",
                diff_log_location=f"pipeline_utils.info_schema_change_logs")

# get a list of the tables to ingest from the mirror of the info_schema
tables_to_ingest: List[Tuple[str, List[str]]] = spark.sql(
    f"Select distinct TABLE_SCHEMA, TABLE_NAME FROM pipeline_utils.info_schemas_mirror.{current_source} WHERE TABLE_SCHEMA NOT IN {ex_schema_str}"
    ).rdd.groupByKey().mapValues(list).collect()

 

 

 

# The actual ingestion bit
n_total: int = len(tables_to_ingest)
i: int = 1

# Iterate through each table to update the diff log and the local mirror
for schema_name, table_list in tables_to_ingest:
    # deal with renaming as required
    schema_name = schema_name
    if source_schema_name in rename_schemas[current_source]:
        target_schema_name = rename_schemas[current_source][source_schema_name]
    else: target_schema_nm = schema_nm
    # get any tables listed for exclusion, else return empty set.
    tables_to_exclude: Set[str] = excl_tables[current_source].get(source_schema_name, set())                                                          
    
    # process each table
    for table_name in table_list:
        print(f"{i}/{n_total}") # keep track of progress
        i += 1
        if table_name not in tables_to_exclude:           
                generate_diff_table(
                query=f"select * from {source_schema_name}.{table_name}",
                table_name=f"{schema_name}_{table_name}",
                mirror_table_location=f"bronze.{target_schema_name}",
                diff_log_loc=f"bronze.{target_schema_name}"
            )

 

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group