- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-12-2023 10:49 AM
Hi,
I am trying to ingest the data from cloudfile to bronze table. DLT is working fist time and loading the data into Bronze table. but when i add new record and change a filed in existing record the DLT pipeline goes success but it should be inserted 1 record and updated 1 record but it shows 0 record processed.
my code is below.
schema = StructType(
[
StructField('customer_id', StringType(), True),
StructField('customer_name', StringType(), True),
StructField('customer_phone', StringType(), True),
StructField('operation_date', StringType(), True)
]
)
path="/mnt/saphana-adls-landing/saphana-adls-landing/customer_landing"
@dlt.table(comment="load bronz customer table from adls datalake landing zone",
path="/mnt/saphana-adls-landing/saphana-adls-landing/delta/bronze_customer")
def customer():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("readchangeFeed","true")
.option("ignoreChanges", "true")
.schema(schema)
.load(path)
#df_landing_customer
)
- Labels:
-
Delta Lake
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-12-2023 12:20 PM
Hi,
neither DLT stream, neither native Spark Structure Streaming, will not pick up a fact that record has changed. It can only read new comings data.
1. If you want to maintain incremental loading of data, and you want to read data which are added, remove this option from you pipeline
.option("readchangeFeed","true")
and check if your pipeline works fine, by adding additional file to this location:
path="/mnt/saphana-adls-landing/saphana-adls-landing/customer_landing"
2. If you don't care about incremental loading of data, but you care about data being changed, you can do the full reload, by changing
spark.readStream.format("cloudFiles") .option("cloudFiles.format", "csv")
to:
spark.read.csv()
3. There is also something called Change Data Feed, but its more advance, and I dont think that its what you are looking for. You are read more about it here: https://docs.databricks.com/en/delta/delta-change-data-feed.html
Good Luck
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-12-2023 12:20 PM
Hi,
neither DLT stream, neither native Spark Structure Streaming, will not pick up a fact that record has changed. It can only read new comings data.
1. If you want to maintain incremental loading of data, and you want to read data which are added, remove this option from you pipeline
.option("readchangeFeed","true")
and check if your pipeline works fine, by adding additional file to this location:
path="/mnt/saphana-adls-landing/saphana-adls-landing/customer_landing"
2. If you don't care about incremental loading of data, but you care about data being changed, you can do the full reload, by changing
spark.readStream.format("cloudFiles") .option("cloudFiles.format", "csv")
to:
spark.read.csv()
3. There is also something called Change Data Feed, but its more advance, and I dont think that its what you are looking for. You are read more about it here: https://docs.databricks.com/en/delta/delta-change-data-feed.html
Good Luck
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-12-2023 07:06 PM
Thank you Emil. I tried all the suggestions. .read works fine it picks up the new data or changed data. but my problem is it is bronze table as target.
in this case my bronze table has duplicate records.
However, let me look at the other options to create another intermediate table and apply the CDC.

