โ07-12-2023 03:00 AM - edited โ07-19-2023 01:40 AM
I have multiple transactional sources feeding into my Azure Databricks env (MySQL, MSSQL, MySQL datadumps) for a client company-wide DataLake. They are basically all "managed sources" (using different DBMSs, receiving application dumps, etc), but I don't trust that they're protected against schema drift and I want to ingest/store all raw data (because of my lack of faith in the sources, some of which are dependant on external third parties) in a batch ETL process, but process only the relevant data. Ideally, I don't want to store a copy of each source table for each ingestion.
Because of the diversity of the data, I don't want to capture change for them all in the same way, so my solution is to keep a log of differences between the current state of the source table and a mirror of the previous ingest. I can then work on choosing how to capture the change for silver/gold on a case by case basis, by propagating changes from the logs into the clean tables.
I've got the current DLT solution below, but I suspect it's very inefficient and that there's a better way of doing things to leverage spark/Databricks functionality. I am also aware that I can easily run into an OOM or pagination issue because I'm loading whole tables, not streaming/batchign, and I'm not entirely sure how the DLT table will handle a change in the number of columns.
Cost is a signficant factor (client works mainly on donor funding). I've got a Unity Catalog-enabled ws, but am using core DLT. i'm also concerned about the persistance of the DLT log tables. I'm not even sure if this is a reasonable use case for DLT. Data quantities are actually pretty small (each source on the order of GBs), but there are one or two monster tables.
Any suggestions for alternative more efficient solutions are welcome!
These are some of the solutions/threads I've been looking at:
Data change feed: https://docs.databricks.com/delta/delta-change-data-feed.html
https://community.databricks.com/t5/data-engineering/what-are-the-best-practices-for-change-data-cap...
https://community.databricks.com/t5/data-engineering/when-should-change-data-feed-be-used/td-p/26000
External management: https://community.databricks.com/t5/data-engineering/how-do-you-capture-change-logs-from-rdms-source...
https://www.databricks.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dm...
https://www.databricks.com/blog/2018/10/29/simplifying-change-data-capture-with-databricks-delta.htm...
DLT CDC: https://docs.gcp.databricks.com/delta-live-tables/cdc.html
similar discussions re ETL from transactional sources:
https://community.databricks.com/t5/data-engineering/implement-autoloader-to-ingest-data-into-delta-...
https://stackoverflow.com/questions/59591536/how-to-compare-two-versions-of-delta-table-to-get-chang...
This is the actual function
from typing import List, Optional, Dict, Tuple, Set
from dataclasses import dataclass
from pandas import DataFrame as PandasDf
import dlt
from pyspark.sql import DataFrame as SparkDf
from pyspark.sql.column import Column
from pyspark.sql.functions import current_timestamp, lit
@dataclass
class Source:
name: str
server_url: str
user: str
password: str
driver: str = "org.mariadb.jdbc.Driver"
def generate_diff_table(
query: str,
table_name: str,
mirror_table_location: str,
diff_log_location: str,
)-> None:
@dlt.table(
name= f"{table_name}_diff_log",
comment=f"Log of changes to query: {query}",
path=diff_log_location
)
def create_diff_table():
timestamp: Column = current_timestamp()
print(query)
source_df: SparkDf = spark.read.format('jdbc').options(
driver=source.driver,
url=f"{source.server_url}",
query=query,
user=source.user,
password=source.password,
useSSL=True,
trustServerCertificate=True,
).load()
mirror_path: str = f"{mirror_table_location}.{table_name}_mirror"
# if the table has been ingested before, do set subtraction to get the differences between the current version and the previous version and store the differences as a new table (diff_df)
if spark.catalog.tableExists(mirror_path):
prev_mirror_df: SparkDf = spark.sql(f"Select * from {mirror_path}")
diff_insert: SparkDf = source_df.subtract(prev_mirror_df).withColumn(colName="action", col=lit("insert")).withColumn(colName="ingest_timestamp", col=lit(timestamp))
diff_del: SparkDf = prev_mirror_df.subtract(source_df).withColumn(colName="action", col=lit("delete")).withColumn(colName="ingest_timestamp", col=lit(timestamp))
diff_df: SparkDf = diff_insert.union(diff_del)
else: # if the tbale has not been ingested before, all rows are essentially inserted
diff_df = source_df.withColumn(colName="action", col=lit("insert")).withColumn(colName="ingest_timestamp", col=lit(timestamp))
source_df.write.mode("overwrite").saveAsTable(mirror_path)
return diff_df
-------------------------------------------------------
โ07-19-2023 01:37 AM
I made the "mistake" of talking to my software engineer partner who majored in pure math about the problem.
They agreed that
1) bringing both tables into memory is not a good idea, especially as then the diff tables will then also be held in memory
2) doing the set subtraction operations is really, really inefficient, especially as it looks like the pyspark subtract is based off SQL Except Distinct which does not use indexing, so for each row in the base table it's iterating thorugh the entire other table checking for a duplicate, and this is then done twice. Not sustainable, not scalable.
They're solution was to process the tables row by row and check for equivalence.
This solution iterates once through the table, with two sorts and a copy, and runs a query per row, essentially.
My concerns with this approach are mainly just that i'd be building my own iterator, rather than leveraging platform streaming capabilities, and that I'm not sure I can guarantee that my tables all have primary keys to sort by.
I'm leaving out a lot about autoincrement issues and the relevance of ordering, and what happens with schema drift etc.
A second approach would be to use Structured Streaming perroiw functionality and do a lookup per row of the source table into the target table, and if it exists, add the current date to a "last checked" column or something, such that any rows that are deleted would have an outdated "last checked" value. any rows that don't exist can be added with the current date to a "date added" column. This means doing a lookup across the target table per row of the source table. Potentially indexing the target table could help here, I don't know. It also means the source and target tables are different shapes and i'd need to exclude my metadata columns for the row-wise comparison.
Any insights into the relative efficiencies of any of these functions are still welcome. I think I'll try my partner's approach first, see how it compares to the original approach. Maybe if I have just crazy excess time I'll try out my idea and see what the impact is. If I have any interesting results to share I'll add it here.
I feel like I'm doing a lto of wheel-reinventing, though, so if anyone has a "you're thiking about this completely wrong" comment, I'm all ears.
โ07-19-2023 01:37 AM
I made the "mistake" of talking to my software engineer partner who majored in pure math about the problem.
They agreed that
1) bringing both tables into memory is not a good idea, especially as then the diff tables will then also be held in memory
2) doing the set subtraction operations is really, really inefficient, especially as it looks like the pyspark subtract is based off SQL Except Distinct which does not use indexing, so for each row in the base table it's iterating thorugh the entire other table checking for a duplicate, and this is then done twice. Not sustainable, not scalable.
They're solution was to process the tables row by row and check for equivalence.
This solution iterates once through the table, with two sorts and a copy, and runs a query per row, essentially.
My concerns with this approach are mainly just that i'd be building my own iterator, rather than leveraging platform streaming capabilities, and that I'm not sure I can guarantee that my tables all have primary keys to sort by.
I'm leaving out a lot about autoincrement issues and the relevance of ordering, and what happens with schema drift etc.
A second approach would be to use Structured Streaming perroiw functionality and do a lookup per row of the source table into the target table, and if it exists, add the current date to a "last checked" column or something, such that any rows that are deleted would have an outdated "last checked" value. any rows that don't exist can be added with the current date to a "date added" column. This means doing a lookup across the target table per row of the source table. Potentially indexing the target table could help here, I don't know. It also means the source and target tables are different shapes and i'd need to exclude my metadata columns for the row-wise comparison.
Any insights into the relative efficiencies of any of these functions are still welcome. I think I'll try my partner's approach first, see how it compares to the original approach. Maybe if I have just crazy excess time I'll try out my idea and see what the impact is. If I have any interesting results to share I'll add it here.
I feel like I'm doing a lto of wheel-reinventing, though, so if anyone has a "you're thiking about this completely wrong" comment, I'm all ears.
โ07-19-2023 01:42 AM - edited โ07-19-2023 01:42 AM
This is the implementaiton of the original pos, for interest (excluded from the original post to reduce length)
# some blacklisting and schema renamings where necessary (due to conflicts and/or illegal characters)
SourceName=str
SchemaName=str
TableName=str
ColName=str
SourceSchemaName = str
TargetSchemaName = str
excl_schemas: Dict[SourceName, List[SchemaName]] = {
"my_first_source": [
"information_schema",
"sys",
"performance_schema",
"mysql",
"redcap",
],
"my_snd_source": [...
]
}
excl_tables: Dict[SourceName, Dict[SchemaName, Set[TableName]]] = {
'my_first_source': {},
'my_snd_source': {},
'data_dump': {
"datadump": {"table_logstore_huge_log_file"}
}
}
#todo: write to table as record of what has been excluded
rename_schemas: Dict[SourceName, Dict[SourceSchemaName, TargetSchemaName]] = {
"my_first_source": {
"my-schema": "my_schema",
"main": "mys_source_main",
},
}
# create a log and update the mirror of the source schema info
def python_to_sql_list(py_list: List[str]) -> str:
"""converts a python list into a sql-parsable list """
return "('" + "', '".join(py_list) + "')"
schemas_to_exclude: List[str] = excl_schemas[current_source]
excl_schema_str: str = python_to_sql_list(schemas_to_exclude)
info_schema_query: str = f"select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, COLUMN_KEY from information_schema.columns WHERE TABLE_SCHEMA not in {excl_schema_str}"
generate_diff_table(query=info_schema_query,
table_namem=f"info_sch_{current_source}",
mirror_table_location=f"pipeline_utils.info_schema_change_logs",
diff_log_location=f"pipeline_utils.info_schema_change_logs")
# get a list of the tables to ingest from the mirror of the info_schema
tables_to_ingest: List[Tuple[str, List[str]]] = spark.sql(
f"Select distinct TABLE_SCHEMA, TABLE_NAME FROM pipeline_utils.info_schemas_mirror.{current_source} WHERE TABLE_SCHEMA NOT IN {ex_schema_str}"
).rdd.groupByKey().mapValues(list).collect()
# The actual ingestion bit
n_total: int = len(tables_to_ingest)
i: int = 1
# Iterate through each table to update the diff log and the local mirror
for schema_name, table_list in tables_to_ingest:
# deal with renaming as required
schema_name = schema_name
if source_schema_name in rename_schemas[current_source]:
target_schema_name = rename_schemas[current_source][source_schema_name]
else: target_schema_nm = schema_nm
# get any tables listed for exclusion, else return empty set.
tables_to_exclude: Set[str] = excl_tables[current_source].get(source_schema_name, set())
# process each table
for table_name in table_list:
print(f"{i}/{n_total}") # keep track of progress
i += 1
if table_name not in tables_to_exclude:
generate_diff_table(
query=f"select * from {source_schema_name}.{table_name}",
table_name=f"{schema_name}_{table_name}",
mirror_table_location=f"bronze.{target_schema_name}",
diff_log_loc=f"bronze.{target_schema_name}"
)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group