Hi,
I am running autoloader which is running continuously and checks for new file every 1 minute. I need to store when file was received/processed but its giving me date when autoloader started.
Here is my code.
df = (spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.validateOptions", "true")
.option("cloudFiles.region", "us-east-1")
.option("cloudFiles.backfillInterval", "1 day")
.option("cloudFiles.fetchParallelism", 100)
.option("cloudFiles.useNotifications", "true")
.schema(streamSchema)
.load(raw_path)
.withColumn('process_date',lit(date.today()))
)
(df
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", bronze_checkpoint_path)
.option("path", bronze_path)
.option("mergeSchema", True)
.trigger(processingTime="1 minute") # or set this to whatever makes sense to the data source
.start()
)
Appreciate any help.
Regards,
Sanjay