Hi @MauricioS
Yes, you can achieve similar reprocessing functionality with DLT streaming tables,
but it requires a different approach than your current batch process. Here are the main strategies:
1. CDC Pattern with Tombstone Records
The most common approach is to send "correction" records through your streaming source:
- When you need to reprocess historical data, publish new records to your stream with updated values
- Use a CDC (Change Data Capture) pattern where each record has an operation type (INSERT, UPDATE, DELETE)
- DLT will process these as new streaming events and apply the changes.
2. DLT Pipeline Refresh with Filters
You can selectively refresh portions of your pipeline:
# In your DLT pipeline
@dlt.table(
name="my_streaming_table",
table_properties={
"pipelines.reset.allowed": "true"
}
)
def my_streaming_table():
return (
spark.readStream
.format("your_source")
.load()
.filter(col("event_date").between(start_date, end_date)) # This won't work directly
)
3. Hybrid Approach - Batch + Streaming
A more practical solution is to use a hybrid pattern:
- Keep your current batch notebook for historical reprocessing
- Use DLT streaming for real-time/near-real-time data
- Create a union view that combines both:
@dlt.table
def historical_data():
return spark.table("your_batch_processed_table")
@dlt.table
def streaming_data():
return spark.readStream.format("your_stream").load()
@dlt.table
def unified_view():
return (
dlt.read("historical_data")
.union(dlt.read("streaming_data"))
.dropDuplicates(["key_column", "timestamp"])
)
4. Pipeline Restart with Date Range
You can restart your DLT pipeline with specific configurations:
- Use pipeline parameters to control date ranges
- Restart the pipeline when you need to reprocess
- This will reprocess the entire pipeline, not just a portion
5. Multiple Streaming Sources
Set up your architecture to handle corrections:
@dlt.table
def main_stream():
return spark.readStream.format("kafka").load()
@dlt.table
def corrections_stream():
return spark.readStream.format("kafka").option("topic", "corrections").load()
@dlt.table
def merged_data():
return (
dlt.read_stream("main_stream")
.union(dlt.read_stream("corrections_stream"))
.withWatermark("timestamp", "1 hour")
.dropDuplicates(["id", "timestamp"])
)
For your use case, I'd suggest:
- Keep your current batch process for historical corrections (it's working well!)
- Add DLT streaming for new, real-time data
- Create a unified view that combines both batch and streaming data
- Use MERGE operations in your final table to handle the combination
LR