cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Need help with DLT Pipeline

Fatimah-Tariq
New Contributor III

I have a DLT pipeline running daily for months and recently found out one issue in my silver layer code and as a result of that, now I have faulty data in my silver schema. Please note that the tables in Silver schema are streaming tables handled within the context of DLT. 
I want to delete the faulty records from my silver schema and keep the pipeline running with the correct code as it's running normally. Could someone please suggest me what would be the best possible approach for me to update my silver schema streaming tables by deleting faulty records? Early response would be highly appreciated.

5 REPLIES 5

saurabh18cs
Valued Contributor

Use Delta Lake's DELETE command to remove the faulty records from your Silver tables. You can do this in a Databricks notebook or a separate script.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DeleteFaultyRecords").getOrCreate()

# Define the criteria for faulty records
faulty_criteria = "your_faulty_criteria_here"

# List of Silver tables to clean
silver_tables = ["silver_table1", "silver_table2"]

for table in silver_tables:
spark.sql(f"DELETE FROM {table} WHERE {faulty_criteria}")

Also, Ensure that your DLT pipeline code is corrected to prevent future faulty records. Update the transformation logic in your Silver layer to handle the data correctly. (spark dataframe transformations or constraints)

AngadSingh
New Contributor III

Hi Fatimah, 

You can delete the records from silver layer as long as those records don't get reloaded again (from bronze). More info is here  

May I also understand that are you are using CDC apply_changes method for loading data to silver (like SCD 1)? If not, definitely above can be done.

Cheers

Yes, I'm using CDC apply_changes method with SCD type 1

In that case, what's the expression in the "apply_as_delete" option? Or please share your CDC apply_changes code block?

I do not have "apply_as_delete" option. 
Here's my apply_changes code

dlt.apply_changes(
      target = silver_table,
      source = bronze_dlt_view,
      keys = primary_keys,
      sequence_by = col(sequence_col),
      stored_as_scd_type = 1,
      except_column_list = ["extract_datetime_utc", "_rescued_data", "dlt_extract_datetime_utc"]
    )

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group