Autoloader and "cleanSource"
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-31-2022 07:45 AM
Hi All,
We are trying to use the Spark 3 structured streaming feature/option ".option('cleanSource','archive')" to archive processed files.
This is working as expected using the standard spark implementation, however does not appear to work using autoloader. I cannot see any documentation to specify whether this supported or not. Whether it is a bug or expected. We have tried various tweaks etc to no avail.
is this a bug or expected?
Is there a an alternate approach using autoloader?
Thanks Larry
df = (
spark.readStream
.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cleanSource","archive")
.option("sourceArchiveDir",archivePath)
.option('header', 'true')
.schema(schema)
.load(path)
.withColumn("loadDate",lit(datetime.utcnow()))
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-01-2022 05:59 AM
https://docs.databricks.com/ingestion/auto-loader/options.html#common-auto-loader-options
cleanSource is not a listed option so it won't do anything.
Maybe event retention is something you can use?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-01-2022 06:03 AM
Yes, but I'm guessing as part of the native spark implementation for file streaming i think it should specify either way?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-01-2022 06:12 AM
Autoloader is only available on Databricks, not in the OSS version of Spark so it is totally possible.
Maybe a databricks dev can step in and clear this out?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-06-2022 07:23 AM
Apologies i meant the cleanSource option is part of native spark 3.0, therefore if it doesn't work in autoloader i would expect to see that its not supported in the docs. Or Error if its included in the code. Currently it accepts it and does nothing.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-07-2022 04:11 AM
It seems the docs state what is supported, not what is not supported.
But I agree that could be a discussion point.
Btw, the standard Spark read function doesn't return errors either when you pass in an invalid option, it is ignored.
![](/skins/images/97567C72181EBE789E1F0FD869E4C89B/responsive_peak/images/icon_anonymous_message.png)
![](/skins/images/97567C72181EBE789E1F0FD869E4C89B/responsive_peak/images/icon_anonymous_message.png)