cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Is it possible to reprocess only a portion of a streaming table data using DLT?

MauricioS
New Contributor III

Hi all,

Currently I have a standard notebook that it takes 2 dates as parameters, start date and end date it goes to the source then it pull only that portion of data then on target table deletes if necessary (if data within those ranges exists) the using the same date ranges as parameters inserts the data.

Now as you may notice this process we also use it when we need to reprocess back dated data, por example I get a notification that maybe that data of 2 weeks ago got a correction at the source so I run same process and it will only update that portion of the start date and end date I give.

I'm trying to transition using streaming tables and DLT Pipelines, is this possible to do with this functionality? Meaning can I re-process a streaming table only for an specific portion of the data?

Thanks in advance.

1 REPLY 1

lingareddy_Alva
Honored Contributor II

Hi @MauricioS 

Yes, you can achieve similar reprocessing functionality with DLT streaming tables,
but it requires a different approach than your current batch process. Here are the main strategies:

1. CDC Pattern with Tombstone Records
The most common approach is to send "correction" records through your streaming source:
- When you need to reprocess historical data, publish new records to your stream with updated values
- Use a CDC (Change Data Capture) pattern where each record has an operation type (INSERT, UPDATE, DELETE)
- DLT will process these as new streaming events and apply the changes.

2. DLT Pipeline Refresh with Filters
You can selectively refresh portions of your pipeline:

# In your DLT pipeline
@dlt.table(
name="my_streaming_table",
table_properties={
"pipelines.reset.allowed": "true"
}
)
def my_streaming_table():
return (
spark.readStream
.format("your_source")
.load()
.filter(col("event_date").between(start_date, end_date)) # This won't work directly
)


3. Hybrid Approach - Batch + Streaming
A more practical solution is to use a hybrid pattern:
- Keep your current batch notebook for historical reprocessing
- Use DLT streaming for real-time/near-real-time data
- Create a union view that combines both:

@dlt.table
def historical_data():
return spark.table("your_batch_processed_table")

@dlt.table
def streaming_data():
return spark.readStream.format("your_stream").load()

@dlt.table
def unified_view():
return (
dlt.read("historical_data")
.union(dlt.read("streaming_data"))
.dropDuplicates(["key_column", "timestamp"])
)

 

4. Pipeline Restart with Date Range
You can restart your DLT pipeline with specific configurations:
- Use pipeline parameters to control date ranges
- Restart the pipeline when you need to reprocess
- This will reprocess the entire pipeline, not just a portion

5. Multiple Streaming Sources
Set up your architecture to handle corrections:

@dlt.table
def main_stream():
return spark.readStream.format("kafka").load()

@dlt.table
def corrections_stream():
return spark.readStream.format("kafka").option("topic", "corrections").load()

@dlt.table
def merged_data():
return (
dlt.read_stream("main_stream")
.union(dlt.read_stream("corrections_stream"))
.withWatermark("timestamp", "1 hour")
.dropDuplicates(["id", "timestamp"])
)

For your use case, I'd suggest:
- Keep your current batch process for historical corrections (it's working well!)
- Add DLT streaming for new, real-time data
- Create a unified view that combines both batch and streaming data
- Use MERGE operations in your final table to handle the combination

 

LR