I'm trying to implement an incremental ingestion logic in the following way:
- database tables have DbUpdatedDate column
- During initial load I perform a full copy of the database table
- During incremental load I:
- scan the data already in the DLT to see what is the most recent DbUpdatedDate that we already have (let's call it a high_watermark)
- query database table to only fetch data with DbUpdatedDate > high_watermark
- I perform unionByName on historized data and the new increment
3.1 is where I'm having issues - when I try to read from the output, I keep getting errors.
The minimal example to reproduce it:
import dlt
INITIAL_RUN = True
@dlt.table
def test_table():
if INITIAL_RUN:
return spark.createDataFrame([
{"id": 1, "val": "1"},
{"id": 2, "val": "2"},
])
else:
dlt.read("test_table")
@dlt.table
def test_table_copy():
df = dlt.read("test_table")
print(df.collect())
return df
When INITIAL_RUN is True, everything works fine. But after I flip it to False (having run it beforehand, so the tables exist) I get the following error:
pyspark.sql.utils.AnalysisException: Failed to read dataset 'test_table'. Dataset is defined in the pipeline but could not be resolved.
Same thing happens when I try to use spark.table("LIVE.test_table")
Is reading from the output a supported scenario? If not how could I work around this?