Delta Live Tables: reading from output
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-11-2023 11:12 AM
I'm trying to implement an incremental ingestion logic in the following way:
- database tables have DbUpdatedDate column
- During initial load I perform a full copy of the database table
- During incremental load I:
- scan the data already in the DLT to see what is the most recent DbUpdatedDate that we already have (let's call it a high_watermark)
- query database table to only fetch data with DbUpdatedDate > high_watermark
- I perform unionByName on historized data and the new increment
3.1 is where I'm having issues - when I try to read from the output, I keep getting errors.
The minimal example to reproduce it:
import dlt
INITIAL_RUN = True
@dlt.table
def test_table():
if INITIAL_RUN:
return spark.createDataFrame([
{"id": 1, "val": "1"},
{"id": 2, "val": "2"},
])
else:
dlt.read("test_table")
@dlt.table
def test_table_copy():
df = dlt.read("test_table")
print(df.collect())
return df
When INITIAL_RUN is True, everything works fine. But after I flip it to False (having run it beforehand, so the tables exist) I get the following error:
pyspark.sql.utils.AnalysisException: Failed to read dataset 'test_table'. Dataset is defined in the pipeline but could not be resolved.
Same thing happens when I try to use spark.table("LIVE.test_table")
Is reading from the output a supported scenario? If not how could I work around this?
- Labels:
-
Delta
-
DLT
-
Live Tables
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-23-2023 06:29 AM
Hi @Chris Nawara,
I had the same issue you had. I was trying to avoid the apply_changes but we in the end I implemented it and I'm happier that I expected hehe
and if you have any additional standardization columns that you need to implement, you can simply read from the apply_changes table and generate the final table.
My logic is like that
readStream -> dlt.view based on the dataframe -> dlt.create_streaming_live_table -> dlt.apply_changes (stored_as_scd_type=2) -> dlt.table (I had to create an additional table because I have few columns to calculatated based on the __START_AT and __END_AT provided by the apply_changes)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-23-2023 07:12 AM
Hi!
@Felipe Cavalcante are you querying the database directly? Or you have a CDC stream in e.g. Kafka? To put it differently - where is the first readStream reading data from?
Best regards,
Chris
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-31-2023 04:51 AM
Hi @Chris Nawara , we read from an adls.
BR
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-15-2023 03:58 AM
HI @Felipe Cavalcante! In my usecase I want to read from a database table, so I guess if you're reading from ADLS location that's a different case

