cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Delta Live Tables: reading from output

knawara
Contributor

I'm trying to implement an incremental ingestion logic in the following way:

  1. database tables have DbUpdatedDate column
  2. During initial load I perform a full copy of the database table
  3. During incremental load I:
    1. scan the data already in the DLT to see what is the most recent DbUpdatedDate that we already have (let's call it a high_watermark)
    2. query database table to only fetch data with DbUpdatedDate > high_watermark
    3. I perform unionByName on historized data and the new increment

3.1 is where I'm having issues - when I try to read from the output, I keep getting errors.

The minimal example to reproduce it:

import dlt
 
INITIAL_RUN = True
 
@dlt.table
def test_table():
    if INITIAL_RUN:
        return spark.createDataFrame([
            {"id": 1, "val": "1"},
            {"id": 2, "val": "2"},
        ])
    else:
        dlt.read("test_table")
        
@dlt.table
def test_table_copy():
    df = dlt.read("test_table")
    print(df.collect())
    return df

When INITIAL_RUN is True, everything works fine. But after I flip it to False (having run it beforehand, so the tables exist) I get the following error:

pyspark.sql.utils.AnalysisException: Failed to read dataset 'test_table'. Dataset is defined in the pipeline but could not be resolved.

Same thing happens when I try to use spark.table("LIVE.test_table")

Is reading from the output a supported scenario? If not how could I work around this?

4 REPLIES 4

fecavalc08
New Contributor III

Hi @Chris Nawaraโ€‹,

I had the same issue you had. I was trying to avoid the apply_changes but we in the end I implemented it and I'm happier that I expected hehe

and if you have any additional standardization columns that you need to implement, you can simply read from the apply_changes table and generate the final table.

My logic is like that

readStream -> dlt.view based on the dataframe -> dlt.create_streaming_live_table -> dlt.apply_changes (stored_as_scd_type=2) -> dlt.table (I had to create an additional table because I have few columns to calculatated based on the __START_AT and __END_AT provided by the apply_changes)

Hi!

@Felipe Cavalcanteโ€‹ are you querying the database directly? Or you have a CDC stream in e.g. Kafka? To put it differently - where is the first readStream reading data from?

Best regards,

Chris

fecavalc08
New Contributor III

Hi @Chris Nawaraโ€‹ , we read from an adls.

BR

HI @Felipe Cavalcanteโ€‹! In my usecase I want to read from a database table, so I guess if you're reading from ADLS location that's a different case