cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Process batches in a streaming pipeline - identifying deletes

Kearon
New Contributor III

OK. So I think I'm probably missing the obvious and tying myself in knots here.

Here is the scenario:

  1. batch datasets arrive in json format in an Azure data lake
  2. each batch is a complete set of "current" records (the complete table)
  3. these are processed using autoloader in a streaming pipeline (because we have other related processes running streaming data pipelines and because autoloader makes life easy)
  4. duplicated data is removed from extracted rows
  5. changes are captured via built-in DLT SCD2 processing

This all working nicely.

Now, I need to identify when a record has been removed (deleted) and no longer appears in the batches.

In the SCD table, this should result in the status of that record changing from "current" to "former"

I am not managing to achieve this.

Here is the relevant pipeline code:

#autoloader picks up files
import dlt
@dlt.table
def currStudents_streamFiles():
        return (
            spark.readStream.format("cloudFiles")
                .option("cloudFiles.format", "json")
                .option("cloudFiles.inferColumnTypes", "true")
                .load("abfss://*****@********.dfs.core.windows.net/current-students/restAPI")
                .select("*","_metadata", "_metadata.file_modification_time")
        )
 
-- extract the rows from the json (each file is a complete dataset with a few hundred rows)
CREATE OR REFRESH STREAMING LIVE TABLE currStudents_ingest
AS SELECT
  col.*
  ,"current" AS status
  ,file_modification_time
FROM  (
SELECT fi.file_modification_time, EXPLODE_OUTER (fi.students)
FROM STREAM(LIVE.currStudents_streamFiles) AS fi 
)
WHERE col.id IS NOT NULL
;
 
#run some de-duplication due to most records being identical each time
import dlt
from pyspark.sql.functions import lit
 
@dlt.table
def currStudents_dedup():
    df = spark.readStream.format("delta").table("live.currStudents_ingest")
    return (
        df.dropDuplicates([col for col in df.columns if col != "file_modification_time"])
        .select('*')
        .withColumn('status', lit('current'))
    )
 
-- capture SCD2 change history
CREATE OR REFRESH STREAMING LIVE TABLE students_SCD;
 
APPLY CHANGES INTO
  live.currStudents_SCD
FROM
  STREAM(live.currStudents_dedup)
KEYS
  (id)
SEQUENCE BY
  file_modification_time
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT (file_modification_time)
 
 
-- match the latest batch (from json file) against "current" version and identify missing records
-- attempt to push identified records back through SCD with a new status "former"
--  DLT pipeline doesn't like INSERT in the apply changes into ....
CREATE TEMPORARY LIVE VIEW former_students_view AS
SELECT *, "former" AS status
-- all records from the last batch processed
FROM (
    SELECT *
    FROM STREAM(live.currStudents_ingest)
    WHERE file_modification_time = (
        SELECT MAX(file_modification_time) FROM STREAM(live.currstudents_streamfiles)
    )
) t1
WHERE NOT EXISTS (
 
  -- "current" version of the table held in Databricks
    SELECT 1
    FROM (
        SELECT schoolId FROM STREAM(live.students_SCD)
        WHERE `__END_AT` IS NULL AND status != "former"
    ) t2
    WHERE t1.schoolId = t2.schoolId
);
 
APPLY CHANGES INTO live.currStudents_dedup
FROM former_students_view
KEYS (schoolId)
INSERT ALL

All help gratefully received.

I am aware that I may be going about this in the wrong way.

11 REPLIES 11

Anonymous
Not applicable

@Kearon McNicol​ :

It seems like you want to capture when a record has been removed (deleted) from the batch datasets that arrive in JSON format in an Azure Data Lake. One approach you can take is to use Delta Lake and leverage its merge functionality to identify and update records in the SCD2 table with a status of "former" for records that no longer appear in the batch datasets.

Here's an example of how you can modify your pipeline code to achieve this:

import pyspark.sql.functions as F
import dlt
 
@dlt.table
def currStudents_streamFiles():
    return (
        spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load("**/restAPI")
            .select("*","_metadata", "_metadata.file_modification_time")
    )
 
@dlt.table
def currStudents_ingest():
    df = (
        spark.readStream.format("delta").table("live.currStudents_streamFiles")
            .select(F.explode_outer("students").alias("data"), "file_modification_time")
            .select("data.*", "file_modification_time")
            .dropDuplicates()
            .withColumn("status", F.lit("current"))
    )
    return df
 
@dlt.table
def students_SCD():
    df = spark.readStream.format("delta").table("live.currStudents_ingest")
    return df.writeStream.format("delta").table("live.students_SCD")
 
@dlt.table
def former_students_view():
    latest_batch_df = (
        spark.readStream.format("delta").table("live.currStudents_ingest")
            .groupBy().agg(F.max("file_modification_time").alias("file_modification_time"))
    )
    current_df = (
        spark.readStream.format("delta").table("live.students_SCD")
            .filter(F.col("__END_AT").isNull())
            .filter(F.col("status") != "former")
            .select("schoolId")
    )
    return (
        latest_batch_df.join(current_df, F.col("schoolId") == F.col("schoolId"), "left_anti")
            .withColumn("status", F.lit("former"))
    )
 
students_SCD_data = students_SCD()
former_students_data = former_students_view()
 
(
    students_SCD_data
    .merge(
        former_students_data,
        "schoolId"
    )
    .whenMatchedUpdate(
        set={
            "status": F.col("merge_action").getField("status")
        }
    )
    .execute()
)
 

In the above code, we use Delta Lake to create tables for the current students ingest, students SCD, and former students view. The current students ingest table is created by reading in the data from the batch datasets and dropping duplicates. The students SCD table is created by reading in the data from the current students ingest table and applying the SCD2 processing. The former students view table is created by identifying records in the latest batch that are not present in the current students SCD table, and marking them with a status of "former". We then use the Delta Lake merge functionality to update the records in the SCD2 table with the new status.

I hope this helps!

Kearon
New Contributor III

@Suteja Kanuri​ . Thank you once again. I'm reviewing your reply now.

I realise I left data lake account info in my code I copied. While it's not a major risk, would you mind removing it from your reply please? Thank you.

Anonymous
Not applicable

@Kearon McNicol​ :Sure, done, cheers

Kearon
New Contributor III

Hi @Suteja Kanuri​,

Thanks again.

I get this error:

AttributeError: 'Dataset' object has no attribute 'merge',Map(),Map(),List(),List(),Map())

Kearon
New Contributor III

It seems my DLT pipeline doesn't support the merge operation. Do I need to change a setting or update my runtime?

Anonymous
Not applicable

@Kearon McNicol​ :

The merge. operation is only available in Databricks Runtime 7.0 and later versions. If you are running an earlier version, you will need to upgrade your runtime to use the merge operation.

Kearon
New Contributor III

Hi @Suteja Kanuri​ 

My pipeline is running DBR 12.1.

Is it because we're trying to do a merge with tables that are streaming live tables?

In case I'm missing something, here is my pipeline config:

{
    "id": "**********************************",
    "clusters": [
        {
            "label": "default",
            "autoscale": {
                "min_workers": 1,
                "max_workers": 2,
                "mode": "ENHANCED"
            }
        }
    ],
    "development": true,
    "continuous": true,
    "channel": "PREVIEW",
    "edition": "ADVANCED",
    "photon": false,
    "libraries": [
        {
            "notebook": {
                "path": "/Repos/************/************/DLT/attendance/regPeriods/dlt_reg_periods_python"
            }
        },
        .
        .
        .
       .
 
        }
    ],
    "name": "CABS-Streaming-Processes_Dev",
    "storage": "dbfs:/pipelines/************************",
    "configuration": {
        "pipelines.enableTrackHistory": "true",
        "spark.databricks.secureVariableSubstitute.enabled": "false",
        "spark.databaseUser": "{{secrets/***********/**************}}",
        "spark.databasePassword": "{{secrets/**********/*******************}}",
        "spark.databricks.delta.schema.autoMerge.enabled": "true"
    },
    "target": "CABS_dataProcessing_Dev"
}

Anonymous
Not applicable

@Kearon McNicol​ :

Based on the configuration you provided, it appears that your pipeline is using Delta Lake (as indicated by the

"spark.databricks.delta.schema.autoMerge.enabled": "true"

configuration parameter) which supports the merge operation. However, it is possible that there are other factors at play that are causing the merge operation to fail.

One possibility is that the merge operation is encountering conflicts due to concurrent updates from streaming sources. Delta Lake allows concurrent writes from streaming and batch sources, but when multiple writers try to update the same records at the same time, conflicts may arise. Delta Lake provides different conflict resolution strategies to handle these scenarios. You may want to consider using Delta Lake's time travel feature to perform merges on snapshots of the table, rather than on the live table. This ensures that the merge operation does not conflict with ongoing writes to the table.

Another possibility is that the version of Delta Lake in DBR 12.1 does not support certain features required for the merge operation. You may want to check the release notes for Delta Lake to see if there are any known issues with the version included in DBR 12.1 that could be causing the merge operation to fail. If this is the case, upgrading to a newer version of DBR may resolve the issue.

Kearon
New Contributor III

Hi @Suteja Kanuri​ ,

DBR 12.1 supports and adds to MERGE functionality.

My guess is that the problem lies in me trying to use the table feeding the SCD table to identify changes - so it is somewhat circular - and conflicts could arise.

Trying to get my head round it now I'm back in the office (been away a lot)

Anonymous
Not applicable

@Kearon McNicol​ :

It sounds like you are trying to use the table that feeds the SCD table as the source for identifying changes in the data. If I understand correctly, this could lead to circular dependencies and conflicts.

To avoid these issues, you may want to consider using a separate source table or view to identify changes in the data. This source table or view should be created based on the business logic for identifying changes and should not have any circular dependencies with the SCD table or the source data.

Once you have the source table or view in place, you can use the MERGE statement in DBR 12.1 to efficiently update the SCD table based on the changes identified in the source table or view.

Keep in mind that the MERGE statement requires careful consideration of the join conditions and update logic to avoid conflicts and ensure correct results. It may be helpful to test the MERGE statement on a small subset of data before running it on the full dataset.

Anonymous
Not applicable

Hi @Kearon McNicol​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.