The CDC Logs from AWS DMS not apply correctly

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-28-2022 06:54 PM
I have a dms task that processing the full-load and replication ongoing tasks
from source (MSSQL) to target (AWS S3)
then use delta lake to handle the CDC logs
I've a notebook that would insert data into mssql continuously (with id as primary key)
then delete some records from mssql directly
the delta lake would work perfectly when there's only insert records
but not the delete ones
it would have some random records being deleted that apply to delta lake
for example.
i first insert 10,000 records
the delta lake apply the insert action perfectly
but when i delete some records (id > 8000) from MSSQL and count the records in delta lake
it would not apply correctly (getting count result > 8000)
wonder if any step i missed
the diagram
1.Continuous insert job
# Databricks notebook source
This script is used to load data to sql server database.
# COMMAND ----------
# MAGIC %pip install Faker
# COMMAND ----------
from faker import Faker
from pyspark.sql import SparkSession, functions, DataFrame
from pyspark.sql.types import StructField, StructType, DecimalType
from typing import List
# COMMAND ----------
spark = SparkSession.builder. \
appName("ExamplePySparkSubmitTask"). \
config("spark.databricks.hive.metastore.glueCatalog.enabled", "true"). \
enableHiveSupport(). \
# COMMAND ----------
faker = Faker()
# COMMAND ----------
def create_sample_profiles(
_faker: Faker()
This function is to create sample profiles
profiles = [_faker.profile() for _ in range(1000)]
return profiles
# COMMAND ----------
def read_profiles_as_dataframe(
_spark: SparkSession,
_profiles: List[dict]
This function is to read profiles as dataframe
df = _spark \
return df
# COMMAND ----------
def process_data(
_df: DataFrame,
_count: int
This function is to process data with requirements
processed_df = _df \
.coalesce(1) \
.withColumn('id', (1000 * _count) + (functions.monotonically_increasing_id() +1) ) \
.withColumn('age', functions.round(functions.rand() * 130)) \
.withColumn('latitude', functions.col('current_location._1')) \
.withColumn('longitude', functions.col('current_location._2')) \
.withColumn('website', functions.concat_ws(',', 'website')) \
return processed_df
# COMMAND ----------
def write_dataframe_to_sql_server(
_df: DataFrame
This function is to write dataframe to sql server
jdbc_options = {
"url": '<URL>',
"user": 'admin',
"password": '<Password>',
"dbtable": 'profiles'
_df \
.write \
.format('jdbc') \
.options(**jdbc_options) \
.mode('append') \
# COMMAND ----------
def main(
_count: int
This is main function
profiles = create_sample_profiles(
df = read_profiles_as_dataframe(
processed_df = process_data(
# COMMAND ----------
if __name__ == '__main__':
# COMMAND ----------
import time
count = 0
while(count < 5):
count += 1
# COMMAND ----------
2.DLT Job
# Databricks notebook source
This script is to process delta lake
# COMMAND ----------
import dlt
from pyspark.sql.functions import col, expr
# COMMAND ----------
# @dlt.expect("valid age", "age > 0 and age <= 100")
def profiles_changes():
schema = 'Op STRING, cdc_load_timestamp TIMESTAMP, address STRING, birthdate DATE, company STRING, job STRING, mail STRING, name STRING, residence STRING, ssn STRING, username STRING, website STRING, id LONG, age DOUBLE, latitude Decimal(38,18), longitude Decimal(38,18)'
return spark \
.readStream \
.format('cloudFiles') \
.option("cloudFiles.format", "parquet") \
.option("cloudFiles.schemaHints", schema) \
.schema(schema) \
# COMMAND ----------
# COMMAND ----------
target = "profiles",
source = "profiles_changes",
keys = ["id"],
sequence_by = col("cdc_load_timestamp"),
apply_as_deletes = expr("Op = 'D'"),
apply_as_truncates = expr("Op = 'T'"),
except_column_list = ["Op", "cdc_load_timestamp"],
stored_as_scd_type = 1
# Observed results
The delta lake not apply the CDC logs correctly (by count in mssql & delta lake)
# Expected results
The delta lake should apply the CDC logs correctly (by count in mssql & delta lake)
# Environment information
- Delta Lake version: 11.3
- Spark version: 3.3.0
- Scala version: 2.12.14
- Labels:
Delta Live Tables
MS SQL Server