Hi @rrajan ,
The simplest solution is to check the max timestamp in each gold table when processing incrementally to get source data. Here's how you can handle this (this would be your source in MERGE statement with every rum:
SELECT * FROM silver AS s
WHERE s.last_update_timestamp
> (SELECT MAX(last_update_timestamp) FROM gold_table_1)
Similarly for the other gold tables.
This way:
- If one gold table fails, other tables' timestamps remain unchanged
- On retry, you'll only process records newer than what's already in each gold table
- No need for additional tracking tables or complex logic
For more robust pipelines, you could consider:
1. A separate metadata table to avoid scanning full gold tables for timestamps
CREATE TABLE metadata_control (
table_name STRING,
last_processed_timestamp TIMESTAMP
)
In this scenario, when loading each gold table, you filter your silver source data based on the last processed timestamp for that specific gold table:
SELECT * FROM silver
WHERE last_update_timestamp > (
SELECT last_processed_timestamp
FROM metadata_control
WHERE table_name = 'gold_table_1'
)
After successful processing of each gold table, you update its timestamp in the metadata table. This ensures that if a later table fails, you won't reprocess already loaded data on retry.
2. You could also consider using Structured Streaming with separate checkpoints for each gold table as an alternative approach. This provides automatic failure handling and exactly-once guarantees. See the Databricks documentation on streaming writes for details: https://docs.databricks.com/en/structured-streaming/delta-lake.html