Delta Live Tables: reading from output

knawara
Contributor

I'm trying to implement an incremental ingestion logic in the following way:

  1. database tables have DbUpdatedDate column
  2. During initial load I perform a full copy of the database table
  3. During incremental load I:
    1. scan the data already in the DLT to see what is the most recent DbUpdatedDate that we already have (let's call it a high_watermark)
    2. query database table to only fetch data with DbUpdatedDate > high_watermark
    3. I perform unionByName on historized data and the new increment

3.1 is where I'm having issues - when I try to read from the output, I keep getting errors.

The minimal example to reproduce it:

import dlt
 
INITIAL_RUN = True
 
@dlt.table
def test_table():
    if INITIAL_RUN:
        return spark.createDataFrame([
            {"id": 1, "val": "1"},
            {"id": 2, "val": "2"},
        ])
    else:
        dlt.read("test_table")
        
@dlt.table
def test_table_copy():
    df = dlt.read("test_table")
    print(df.collect())
    return df

When INITIAL_RUN is True, everything works fine. But after I flip it to False (having run it beforehand, so the tables exist) I get the following error:

pyspark.sql.utils.AnalysisException: Failed to read dataset 'test_table'. Dataset is defined in the pipeline but could not be resolved.

Same thing happens when I try to use spark.table("LIVE.test_table")

Is reading from the output a supported scenario? If not how could I work around this?