cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Migrating source directory in an existing DLT Pipeline with Autoloader

SamAdams
Contributor

I have a DLT pipeline that reads data in S3 into an append-only bronze layer using Autoloader. The data sink needs to be changed to a new s3 bucket in a new account, and data in the existing s3 bucket migrated to the new one.

Will Autoloader still be able to tell that it has already processed those files after they have been replicated to the new S3 bucket?

I'm thinking of setting Autoloader's `cloudfiles.includeExistingFiles=False` and the table property `{"pipelines.reset.allowed": "false"}` to avoid re-processing all that old Bronze layer data if not. Sample Python code below, for reference:

@dlt.table(
name=f"append_only_bronze_layer",
table_properties={"quality": "bronze"},
)
def raw_bronze_layer() -> DataFrame:
return (
spark.readStream.format("cloudFiles")
.options(
cloudFiles.format="avro",
cloudFiles.inferColumnTypes=True,
)
.load(/this/path/will/change)
)

Thanks in advance for any advice on how to avoid re-processing data when the DLT source path changes.

4 REPLIES 4

SamAdams
Contributor

Brahmareddy
Honored Contributor III

Hi SamAdams,

How are you doing today?, As per my understanding, You're on the right track here! When you change the S3 path for Autoloader, even if the files are exactly the same and just copied from the old bucket, Autoloader will treat them as new files because it tracks them based on their original path and metadata. So yes, it could reprocess everything unless you take steps to avoid it. Setting cloudFiles.includeExistingFiles=False is a smart move—it tells Autoloader to ignore any existing files in the new path and only pick up new ones going forward. Adding {"pipelines.reset.allowed": "false"} also helps make sure the pipeline doesn’t accidentally reset and reprocess old data. As long as you keep your checkpoint location the same, and the old files don’t get picked up again, you should be safe. Let me know if you need help double-checking your setup before switching buckets!

Regards,

Brahma

SamAdams
Contributor

@Brahmareddy just to follow up, those properties above worked as expected in some small tests when I changed the directory WITHIN a bucket, but there were a few more details that mattered here when changing the bucket:

1. When changing buckets and not triggering a full reset I needed to add this spark configuration `spark.databricks.cloudFiles.checkSourceChanged false  [1] to keep the stream going. Otherwise you end up with a StreamingQueryException: The bucket in the file event `{"backfill":{"bucket":"<bucket-name>","key":"<path-to-key>","size":<size>,"eventTime":<unix-time>}}` is different from expected by the source: `<new-bucket-name>`

2. However, I should have just triggered a full pipeline reset & taken advantage of that "no reset allowed" property for the bronze layer to keep it as is. The `includeExistingFiles=False` autoloader property only takes effect on the FIRST time it's run, so when doing a regular pipeline update with the new destination it went ahead and re-processed all the old data I had copied there as if it were new.

In retrospect it would probably have been simpler to copy the data to a different folder in the S3 bucket and then move it back together with the new data once a full backfill was needed.

Regardless, thanks for the suggestions.

[1] https://kb.databricks.com/en_US/streaming/error-when-trying-to-run-an-auto-loader-job-that-uses-clou...

 

Brahmareddy
Honored Contributor III

Hi SamAdams,

How are doing today? , Really appreciate you sharing what worked and what didn’t in your case! You're absolutely right—when switching buckets, not just folders within a bucket, that spark.databricks.cloudFiles.checkSourceChanged config becomes critical to avoid that StreamingQueryException. Also great point about the includeExistingFiles=False only applying on the very first run—it's one of those gotchas that can easily catch folks off guard when doing incremental updates or migrations. Doing a full reset and isolating the bronze layer with the pipelines.reset.allowed = false property is a smart call, especially when you want to preserve downstream processing. And yeah, pre-copying to a separate folder and moving it in when ready is a nice, clean strategy too. Thanks again for sharing—this will definitely help others running into similar challenges!

Regards,

Brahma

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now