cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader cleansource option does not take any effect

janm2
New Contributor II

Hello everyone,

I was very keen to try out the Autoloader's new cleanSource option so we can clean up our landing folder easily.

However I found out it does not have any effect whatsoever. As I cannot create a support case I am creating this post.

A simple streaming job such as 

df = (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "s3://my_bucket/data_schema")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.cleanSource", "move")
    .option("cloudFiles.cleanSource.retentionDuration", "2 minutes")
    .option("cloudFiles.cleanSource.moveDestination", "s3://my_bucket/data_moved")
    .load("s3://my_bucket/data"))
)


(
    df
    .writeStream
    .format("delta")
    .option("checkpointLocation", "s3://my_bucket/data_checkpoint")
    .option("mergeSchema", "true")
    .queryName("abcd")
    .outputMode("append")
    .trigger(processingTime="10 seconds")
    .table("dev.bronze.tmp_cloud_files_testing")
)


ingests the data put into s3://my_bucket/data without issues, however the data is never moved as specified. Even when waiting for hours. I've tested few files as well as thousands of files.

I've tested s3 locations, dbfs locations as well as local locations. Autoloader has permissions to write to the locations as I could easily stream into parquet files.

What I find especially suspicious is that

SELECT * FROM cloud_files_state("s3://my_bucket/data_checkpoint")

shows there is no archive_mode set for the data.

Setting cloudFiles.cleanSource to DELETE does also not do anything.

All tested on clusters running DBR 16.4 or DBR 17.

Do we have to turn this feature on somewhere or did I implement something incorrectly? I really do not know what to try next.

Thank you
Jan

3 REPLIES 3

szymon_dybczak
Esteemed Contributor III

Hi @janm2 ,

Could you try to replace your code with uppercase "MOVE" (or "DELETE" depending on which you want to use)?
I know that it sounds silly, but I've encounter several times cases where case-sensitive was shooting me in face 🙂

And since you wrote that there is no archive_mode set for the data, that makes me wonder if this is due to this reason 😄

.option("cloudfiles.cleanSource", "MOVE")

 

Hello, thank you for your reply.

I have tried this before and unfortunately this was not it.

Pat
Esteemed Contributor

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now