source_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("timestampFormat", "d-M-y H.m")
.option("cloudFiles.schemaLocation", f"{landing_folder_path}/Opportunity_schema")
#.option("cloudFiles.inferColumnTypes", "true")
.schema(schema) # Explicitly define schema to avoid invalid characters
.load(f"{landing_folder_path}/Opportunity")
)
source_df = source_df.filter("Id IS NOT NULL") # Example of filtering out corrupt data
write_query = (source_df.writeStream
.format("delta")
.option("checkpointLocation", f"{landing_folder_path}/Opportunity/checkpoint")
.option("mergeSchema", "false")
.outputMode("append")
.trigger(availableNow=True)
.toTable("dev.demo_db.Opportunity_raw")
)
write_query.awaitTermination() # Ensure the stream is running
ingest()
i have issue with auto loader that i can't getting incremental load on this . if rerun the ingest() the some unstructured data ingecting into Opportunity_raw