cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Table - not reading the changed record from cloud file

alj_a
New Contributor III

Hi,

I am trying to ingest the data from cloudfile to bronze table. DLT is working fist time and loading the data into Bronze table. but when i add new record and change a filed in existing record the DLT pipeline goes success but it should be inserted 1 record and updated 1 record but it shows 0 record processed.

my code is below.

 

schema = StructType(
   [
    StructField('customer_id', StringType(), True),
    StructField('customer_name', StringType(), True),
    StructField('customer_phone', StringType(), True),
    StructField('operation_date', StringType(), True)
   ]
  )
path="/mnt/saphana-adls-landing/saphana-adls-landing/customer_landing"
@dlt.table(comment="load bronz customer table from adls datalake landing zone",
                  path="/mnt/saphana-adls-landing/saphana-adls-landing/delta/bronze_customer")
def customer():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", "true")
      .option("readchangeFeed","true")
      .option("ignoreChanges", "true")
      .schema(schema) 
      .load(path)
      #df_landing_customer
      )

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

Emil_Kaminski
Contributor

Hi, 

neither DLT stream, neither native Spark Structure Streaming, will not pick up a fact that record has changed. It can only read new comings data. 

1. If you want to maintain incremental loading of data, and you want to read data which are added, remove this option from you pipeline 

.option("readchangeFeed","true")

and check if your pipeline works fine, by adding additional file to this location: 

path="/mnt/saphana-adls-landing/saphana-adls-landing/customer_landing"

2. If you don't care about incremental loading of data, but you care about data being changed, you can do the full reload, by changing 

spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")

to:

spark.read.csv()

3. There is also something called Change Data Feed, but its more advance, and I dont think that its what you are looking for. You are read more about it here: https://docs.databricks.com/en/delta/delta-change-data-feed.html

Good Luck

** You might also want to subscribe to Warsaw Databricks YT channel: https://www.youtube.com/channel/UC1-u_2nI97cNHtu_FQ3HP_A

View solution in original post

3 REPLIES 3

Emil_Kaminski
Contributor

Hi, 

neither DLT stream, neither native Spark Structure Streaming, will not pick up a fact that record has changed. It can only read new comings data. 

1. If you want to maintain incremental loading of data, and you want to read data which are added, remove this option from you pipeline 

.option("readchangeFeed","true")

and check if your pipeline works fine, by adding additional file to this location: 

path="/mnt/saphana-adls-landing/saphana-adls-landing/customer_landing"

2. If you don't care about incremental loading of data, but you care about data being changed, you can do the full reload, by changing 

spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")

to:

spark.read.csv()

3. There is also something called Change Data Feed, but its more advance, and I dont think that its what you are looking for. You are read more about it here: https://docs.databricks.com/en/delta/delta-change-data-feed.html

Good Luck

** You might also want to subscribe to Warsaw Databricks YT channel: https://www.youtube.com/channel/UC1-u_2nI97cNHtu_FQ3HP_A

Hi @alj_a, Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 
 

alj_a
New Contributor III

Thank you Emil. I tried all the suggestions. .read works fine it picks up the new data or changed data. but my problem is it is bronze table  as target. 

in this case my bronze table has duplicate records. 

However, let me look at the other options to create another intermediate table and apply the CDC.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.