cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables: reading from output

knawara
Contributor

I'm trying to implement an incremental ingestion logic in the following way:

  1. database tables have DbUpdatedDate column
  2. During initial load I perform a full copy of the database table
  3. During incremental load I:
    1. scan the data already in the DLT to see what is the most recent DbUpdatedDate that we already have (let's call it a high_watermark)
    2. query database table to only fetch data with DbUpdatedDate > high_watermark
    3. I perform unionByName on historized data and the new increment

3.1 is where I'm having issues - when I try to read from the output, I keep getting errors.

The minimal example to reproduce it:

import dlt
 
INITIAL_RUN = True
 
@dlt.table
def test_table():
    if INITIAL_RUN:
        return spark.createDataFrame([
            {"id": 1, "val": "1"},
            {"id": 2, "val": "2"},
        ])
    else:
        dlt.read("test_table")
        
@dlt.table
def test_table_copy():
    df = dlt.read("test_table")
    print(df.collect())
    return df

When INITIAL_RUN is True, everything works fine. But after I flip it to False (having run it beforehand, so the tables exist) I get the following error:

pyspark.sql.utils.AnalysisException: Failed to read dataset 'test_table'. Dataset is defined in the pipeline but could not be resolved.

Same thing happens when I try to use spark.table("LIVE.test_table")

Is reading from the output a supported scenario? If not how could I work around this?

4 REPLIES 4

fecavalc08
New Contributor III

Hi @Chris Nawara​,

I had the same issue you had. I was trying to avoid the apply_changes but we in the end I implemented it and I'm happier that I expected hehe

and if you have any additional standardization columns that you need to implement, you can simply read from the apply_changes table and generate the final table.

My logic is like that

readStream -> dlt.view based on the dataframe -> dlt.create_streaming_live_table -> dlt.apply_changes (stored_as_scd_type=2) -> dlt.table (I had to create an additional table because I have few columns to calculatated based on the __START_AT and __END_AT provided by the apply_changes)

Hi!

@Felipe Cavalcante​ are you querying the database directly? Or you have a CDC stream in e.g. Kafka? To put it differently - where is the first readStream reading data from?

Best regards,

Chris

fecavalc08
New Contributor III

Hi @Chris Nawara​ , we read from an adls.

BR

HI @Felipe Cavalcante​! In my usecase I want to read from a database table, so I guess if you're reading from ADLS location that's a different case

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.