Delta Live Tables: reading from output
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-11-2023 11:12 AM
I'm trying to implement an incremental ingestion logic in the following way:
- database tables have DbUpdatedDate column
- During initial load I perform a full copy of the database table
- During incremental load I:
- scan the data already in the DLT to see what is the most recent DbUpdatedDate that we already have (let's call it a high_watermark)
- query database table to only fetch data with DbUpdatedDate > high_watermark
- I perform unionByName on historized data and the new increment
3.1 is where I'm having issues - when I try to read from the output, I keep getting errors.
The minimal example to reproduce it:
import dlt
INITIAL_RUN = True
@dlt.table
def test_table():
if INITIAL_RUN:
return spark.createDataFrame([
{"id": 1, "val": "1"},
{"id": 2, "val": "2"},
])
else:
dlt.read("test_table")
@dlt.table
def test_table_copy():
df = dlt.read("test_table")
print(df.collect())
return dfWhen INITIAL_RUN is True, everything works fine. But after I flip it to False (having run it beforehand, so the tables exist) I get the following error:
pyspark.sql.utils.AnalysisException: Failed to read dataset 'test_table'. Dataset is defined in the pipeline but could not be resolved.Same thing happens when I try to use spark.table("LIVE.test_table")
Is reading from the output a supported scenario? If not how could I work around this?
Labels:
- Labels:
-
DLT
-
Live Tables