Hello everyone,
I was very keen to try out the Autoloader's new cleanSource option so we can clean up our landing folder easily.
However I found out it does not have any effect whatsoever. As I cannot create a support case I am creating this post.
A simple streaming job such as
df = (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "s3://my_bucket/data_schema")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.cleanSource", "move")
.option("cloudFiles.cleanSource.retentionDuration", "2 minutes")
.option("cloudFiles.cleanSource.moveDestination", "s3://my_bucket/data_moved")
.load("s3://my_bucket/data"))
)
(
df
.writeStream
.format("delta")
.option("checkpointLocation", "s3://my_bucket/data_checkpoint")
.option("mergeSchema", "true")
.queryName("abcd")
.outputMode("append")
.trigger(processingTime="10 seconds")
.table("dev.bronze.tmp_cloud_files_testing")
)
ingests the data put into s3://my_bucket/data without issues, however the data is never moved as specified. Even when waiting for hours. I've tested few files as well as thousands of files.
I've tested s3 locations, dbfs locations as well as local locations. Autoloader has permissions to write to the locations as I could easily stream into parquet files.
What I find especially suspicious is that
SELECT * FROM cloud_files_state("s3://my_bucket/data_checkpoint")
shows there is no archive_mode set for the data.
Setting cloudFiles.cleanSource to DELETE does also not do anything.
All tested on clusters running DBR 16.4 or DBR 17.
Do we have to turn this feature on somewhere or did I implement something incorrectly? I really do not know what to try next.
Thank you
Jan