Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-12-2023 10:49 AM
Hi,
I am trying to ingest the data from cloudfile to bronze table. DLT is working fist time and loading the data into Bronze table. but when i add new record and change a filed in existing record the DLT pipeline goes success but it should be inserted 1 record and updated 1 record but it shows 0 record processed.
my code is below.
schema = StructType(
[
StructField('customer_id', StringType(), True),
StructField('customer_name', StringType(), True),
StructField('customer_phone', StringType(), True),
StructField('operation_date', StringType(), True)
]
)
path="/mnt/saphana-adls-landing/saphana-adls-landing/customer_landing"
@dlt.table(comment="load bronz customer table from adls datalake landing zone",
path="/mnt/saphana-adls-landing/saphana-adls-landing/delta/bronze_customer")
def customer():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("readchangeFeed","true")
.option("ignoreChanges", "true")
.schema(schema)
.load(path)
#df_landing_customer
)