04-06-2023 07:43 AM
OK. So I think I'm probably missing the obvious and tying myself in knots here.
Here is the scenario:
This all working nicely.
Now, I need to identify when a record has been removed (deleted) and no longer appears in the batches.
In the SCD table, this should result in the status of that record changing from "current" to "former"
I am not managing to achieve this.
Here is the relevant pipeline code:
#autoloader picks up files
import dlt
@dlt.table
def currStudents_streamFiles():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("abfss://*****@********.dfs.core.windows.net/current-students/restAPI")
.select("*","_metadata", "_metadata.file_modification_time")
)
-- extract the rows from the json (each file is a complete dataset with a few hundred rows)
CREATE OR REFRESH STREAMING LIVE TABLE currStudents_ingest
AS SELECT
col.*
,"current" AS status
,file_modification_time
FROM (
SELECT fi.file_modification_time, EXPLODE_OUTER (fi.students)
FROM STREAM(LIVE.currStudents_streamFiles) AS fi
)
WHERE col.id IS NOT NULL
;
#run some de-duplication due to most records being identical each time
import dlt
from pyspark.sql.functions import lit
@dlt.table
def currStudents_dedup():
df = spark.readStream.format("delta").table("live.currStudents_ingest")
return (
df.dropDuplicates([col for col in df.columns if col != "file_modification_time"])
.select('*')
.withColumn('status', lit('current'))
)
-- capture SCD2 change history
CREATE OR REFRESH STREAMING LIVE TABLE students_SCD;
APPLY CHANGES INTO
live.currStudents_SCD
FROM
STREAM(live.currStudents_dedup)
KEYS
(id)
SEQUENCE BY
file_modification_time
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT (file_modification_time)
-- match the latest batch (from json file) against "current" version and identify missing records
-- attempt to push identified records back through SCD with a new status "former"
-- DLT pipeline doesn't like INSERT in the apply changes into ....
CREATE TEMPORARY LIVE VIEW former_students_view AS
SELECT *, "former" AS status
-- all records from the last batch processed
FROM (
SELECT *
FROM STREAM(live.currStudents_ingest)
WHERE file_modification_time = (
SELECT MAX(file_modification_time) FROM STREAM(live.currstudents_streamfiles)
)
) t1
WHERE NOT EXISTS (
-- "current" version of the table held in Databricks
SELECT 1
FROM (
SELECT schoolId FROM STREAM(live.students_SCD)
WHERE `__END_AT` IS NULL AND status != "former"
) t2
WHERE t1.schoolId = t2.schoolId
);
APPLY CHANGES INTO live.currStudents_dedup
FROM former_students_view
KEYS (schoolId)
INSERT ALL
All help gratefully received.
I am aware that I may be going about this in the wrong way.
04-09-2023 08:40 AM
@Kearon McNicol :
It seems like you want to capture when a record has been removed (deleted) from the batch datasets that arrive in JSON format in an Azure Data Lake. One approach you can take is to use Delta Lake and leverage its merge functionality to identify and update records in the SCD2 table with a status of "former" for records that no longer appear in the batch datasets.
Here's an example of how you can modify your pipeline code to achieve this:
import pyspark.sql.functions as F
import dlt
@dlt.table
def currStudents_streamFiles():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("**/restAPI")
.select("*","_metadata", "_metadata.file_modification_time")
)
@dlt.table
def currStudents_ingest():
df = (
spark.readStream.format("delta").table("live.currStudents_streamFiles")
.select(F.explode_outer("students").alias("data"), "file_modification_time")
.select("data.*", "file_modification_time")
.dropDuplicates()
.withColumn("status", F.lit("current"))
)
return df
@dlt.table
def students_SCD():
df = spark.readStream.format("delta").table("live.currStudents_ingest")
return df.writeStream.format("delta").table("live.students_SCD")
@dlt.table
def former_students_view():
latest_batch_df = (
spark.readStream.format("delta").table("live.currStudents_ingest")
.groupBy().agg(F.max("file_modification_time").alias("file_modification_time"))
)
current_df = (
spark.readStream.format("delta").table("live.students_SCD")
.filter(F.col("__END_AT").isNull())
.filter(F.col("status") != "former")
.select("schoolId")
)
return (
latest_batch_df.join(current_df, F.col("schoolId") == F.col("schoolId"), "left_anti")
.withColumn("status", F.lit("former"))
)
students_SCD_data = students_SCD()
former_students_data = former_students_view()
(
students_SCD_data
.merge(
former_students_data,
"schoolId"
)
.whenMatchedUpdate(
set={
"status": F.col("merge_action").getField("status")
}
)
.execute()
)
In the above code, we use Delta Lake to create tables for the current students ingest, students SCD, and former students view. The current students ingest table is created by reading in the data from the batch datasets and dropping duplicates. The students SCD table is created by reading in the data from the current students ingest table and applying the SCD2 processing. The former students view table is created by identifying records in the latest batch that are not present in the current students SCD table, and marking them with a status of "former". We then use the Delta Lake merge functionality to update the records in the SCD2 table with the new status.
I hope this helps!
04-12-2023 03:43 AM
@Suteja Kanuri . Thank you once again. I'm reviewing your reply now.
I realise I left data lake account info in my code I copied. While it's not a major risk, would you mind removing it from your reply please? Thank you.
04-14-2023 09:52 AM
@Kearon McNicol :Sure, done, cheers
04-12-2023 07:40 AM
Hi @Suteja Kanuri,
Thanks again.
I get this error:
AttributeError: 'Dataset' object has no attribute 'merge',Map(),Map(),List(),List(),Map())
04-14-2023 12:56 AM
It seems my DLT pipeline doesn't support the merge operation. Do I need to change a setting or update my runtime?
04-14-2023 09:54 AM
@Kearon McNicol :
The merge. operation is only available in Databricks Runtime 7.0 and later versions. If you are running an earlier version, you will need to upgrade your runtime to use the merge operation.
04-17-2023 02:13 AM
Hi @Suteja Kanuri
My pipeline is running DBR 12.1.
Is it because we're trying to do a merge with tables that are streaming live tables?
In case I'm missing something, here is my pipeline config:
{
"id": "**********************************",
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 2,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": true,
"channel": "PREVIEW",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "/Repos/************/************/DLT/attendance/regPeriods/dlt_reg_periods_python"
}
},
.
.
.
.
}
],
"name": "CABS-Streaming-Processes_Dev",
"storage": "dbfs:/pipelines/************************",
"configuration": {
"pipelines.enableTrackHistory": "true",
"spark.databricks.secureVariableSubstitute.enabled": "false",
"spark.databaseUser": "{{secrets/***********/**************}}",
"spark.databasePassword": "{{secrets/**********/*******************}}",
"spark.databricks.delta.schema.autoMerge.enabled": "true"
},
"target": "CABS_dataProcessing_Dev"
}
04-17-2023 05:52 AM
@Kearon McNicol :
Based on the configuration you provided, it appears that your pipeline is using Delta Lake (as indicated by the
"spark.databricks.delta.schema.autoMerge.enabled": "true"
configuration parameter) which supports the merge operation. However, it is possible that there are other factors at play that are causing the merge operation to fail.
One possibility is that the merge operation is encountering conflicts due to concurrent updates from streaming sources. Delta Lake allows concurrent writes from streaming and batch sources, but when multiple writers try to update the same records at the same time, conflicts may arise. Delta Lake provides different conflict resolution strategies to handle these scenarios. You may want to consider using Delta Lake's time travel feature to perform merges on snapshots of the table, rather than on the live table. This ensures that the merge operation does not conflict with ongoing writes to the table.
Another possibility is that the version of Delta Lake in DBR 12.1 does not support certain features required for the merge operation. You may want to check the release notes for Delta Lake to see if there are any known issues with the version included in DBR 12.1 that could be causing the merge operation to fail. If this is the case, upgrading to a newer version of DBR may resolve the issue.
04-25-2023 06:54 AM
Hi @Suteja Kanuri ,
DBR 12.1 supports and adds to MERGE functionality.
My guess is that the problem lies in me trying to use the table feeding the SCD table to identify changes - so it is somewhat circular - and conflicts could arise.
Trying to get my head round it now I'm back in the office (been away a lot)
04-27-2023 11:36 AM
@Kearon McNicol :
It sounds like you are trying to use the table that feeds the SCD table as the source for identifying changes in the data. If I understand correctly, this could lead to circular dependencies and conflicts.
To avoid these issues, you may want to consider using a separate source table or view to identify changes in the data. This source table or view should be created based on the business logic for identifying changes and should not have any circular dependencies with the SCD table or the source data.
Once you have the source table or view in place, you can use the MERGE statement in DBR 12.1 to efficiently update the SCD table based on the changes identified in the source table or view.
Keep in mind that the MERGE statement requires careful consideration of the join conditions and update logic to avoid conflicts and ensure correct results. It may be helpful to test the MERGE statement on a small subset of data before running it on the full dataset.
04-12-2023 12:44 AM
Hi @Kearon McNicol
Thank you for posting your question in our community! We are happy to assist you.
To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?
This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group