Hello,
I am trying to use delta live tables in a production setting but I am having an issue in ensuring that I will be able to confirm the status of the data that the various tables have processed in the pipeline.
In the most basic case let me imagine that I have 3 tables that are linked to one another in a chain.
table_1 -> table_2 -> table_3
table_1 will be an autoloader instance pointing at blob storage.
table_2 and 3 will do some subsequent transformations.
The files are dropped in to the blob storage location in batched folders with the following folder structure:
.
├── 2024/
│ ├── Aug/
│ │ ├── 19/
│ │ │ ├── 07:05:40/
│ │ │ │ ├── table_1_name/
│ │ │ │ │ ├── table_1_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_1_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_1_name_other_text_0_0_2.snappy.parquet
│ │ │ │ ├── table_2_name/
│ │ │ │ │ ├── table_2_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_2_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_2_name_other_text_0_0_2.snappy.parquet
│ │ │ │ └── ...
│ │ │ ├── 10:07:22/
│ │ │ │ ├── table_1_name/
│ │ │ │ │ ├── table_1_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_1_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_1_name_other_text_0_0_2.snappy.parquet
│ │ │ │ ├── table_2_name/
│ │ │ │ │ ├── table_2_name_other_text_0_0_0.snappy.parquet
│ │ │ │ │ ├── table_2_name_other_text_0_0_1.snappy.parquet
│ │ │ │ │ └── table_2_name_other_text_0_0_2.snappy.parquet
│ │ │ │ └── ...
│ │ │ └── ...
│ │ └── ...
│ └── ...
└── ...
The DLT pipeline will be triggered once a new batch of data has been loaded into the blob storage location.
I would like to be able to trace the current status of the delta live tables pipeline to know that all of the tables have processsed all of the batches of data and are thus up to date.
I have looked at the cloud_files_state for the autoloader instance and this allows me to easily determine which batches are available and have been discovered by autoloader.
In theory I could populate a table with a query like:
def processed_batches():
return spark.sql("""SELECT DISTINCT folder
FROM (
SELECT regexp_extract(path, '^(.+)/', 1) AS folder
FROM cloud_files_state(TABLE(sandbox.trial.table_1))
)""")
but this doesn't work complaining that the table doesn't exist. I assume this is because it is looking to try and read this location before the DLT pipeline has made the table.
If I wasn't using DLT I would write some functions to create a state management table: something like...
## table_1_processed_batches:
runID source BatchID added_timestamp
uuid external 20240101010101 20240927010101
and then after a downstream process has run it would write out its own table
## table_2_processed_batches:
runID source BatchID added_timestamp
uuid table_1 20240101010101 20240927010101
I feel like this should be something that I can get from the built in tables in DLT but have not found a way of achieving this thus far. Any help would be greatly appreciated.