So imagine, you are a Data Engineer using Databricks Auto Loader to ingest data from cloud storage, when you suddenly realize that recently ingested data has some problem and you’d like to simply delete it from the bronze layer and re-ingest it. This blog post gives an example of that scenario, followed by a recommended solution, as well as noting alternative approaches, each with their drawbacks compared to the recommended solution.
Let’s consider a contrived example that highlights the key aspects of our scenario.
Suppose you are a Data Engineer at a new fitness watch company, and you setup an Auto Loader-based pipeline to ingest events that are coming in from people's fitness watches ever time the finish a run/jog. These events are landing in JSON files and have the form:
{ "user_id": "1",
"start_at": "2024-06-01 10:42:58",
"finish_at":"2024-06-01 11:38:45",
"distance": 6.5
}
and your (simplified) ingestion pipeline looks something like the following. The following code is really just pseudocode, to illustrate the point, and not full PySpark code that you can copy/paste into your own notebook.
source_dir = "/Volumes/bronze/default/raw/json"
checkpoint = "/Volumes/bronze/default/raw/checkpoint"
bronze_table = "bronze.default.raw"
df = (
spark.readStream
.format( "cloudFiles" )
.option( "cloudFiles.format", "json" )
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, distance decimal(10,4)" )
.load( source_dir )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.writeStream
.option( "checkpointLocation", checkpoint )
.toTable( bronze_table )
)
First note the use of the _metadata (File metadata) column. Embedding the name of the source JSON file and its modification time is crucial to the solution we will outline below. Generally, it is considered a best practice to embed file metadata information into your Bronze layer when ingesting from files in cloud storage, so that we can know which file each row of our raw data came from.
Our bronze.default.raw table looks something like
user_id | start_at | finish_at | distance | _FILE_PATH | _FILE_MODIFICATION_TIME |
1 | 2024-06-01 10:42:58 | 2024-06-01 11:38:45 | 6.5 | 1394.json | 2024-06-01 11:20:00 |
2 |
2024-06-02 07:19:49
|
2024-06-02 07:55:19
|
4.25 | 1395.json | 2024-06-02 07:55:25 |
3 |
2024-06-03 18:55:32
|
2024-06-03 20:10:04
|
30.6 | 1396.json | 2024-06-03 20:10:09 |
And we are happy with how our ingest pipeline is running.
But, suppose we get an email the morning of 2024-06-03 from a data analyst asking about that new field that was rolled-out in a software update two days prior. On 2024-06-02, a software update was pushed to all the watches and now those events have a new activity field to indicate what kind of fitness activity is being done. For example, "running" or "cycling". No one told Data Engineering ahead of time, so we likely have ingested some data that is missing this new field.
We talk to release engineering and get a date+time of when the new version was pushed, then we look at our raw JSON files and pinpoint when the change started appearing in the data. We find
Filename | Modification Tme | Payload |
1395.json | 2024-06-02 07:55:25 | { "user_id": 2, "start_at": "2024-06-02 07:19:49", "finish_at": "2024-06-02 07:55:19", "distance": 4.25 } |
1396.json | 2024-06-03 20:10:09 | { "user_id": 3, "start_at": "2024-06-03 18:55:32", "finish_at": "2024-06-03 20:10:04", "activity": "cycling", "distance": 30.6 } |
And sure enough, in 1396.json the JSON payload contains that new activity field. Now, while we can update our Bronze table schema to have this new field, we've already ingested file 1396.json without that field, so can we somehow tell Autoloader to re-ingest that file (and any others coming after it)? We of course could delete the entire bronze table and re-ingest it completely from scratch, but we'd rather not. Can we somehow just re-ingest these most recent files?
No...but also, yes.
One of Auto Loader's main selling-points is its "ingest once, and only once" behavior. By default, Auto Loader will scan your cloud storage folder and ingest the files it finds there. It also keeps track of which files have been ingested already, so that if it fails and restarts, it will not re-ingest all those files again. Where does Auto Loader keep track of the files it's already ingested? Checkpoint.
The checkpoint is specified in our code sample above
...
.writeStream
.option( "checkpointLocation", checkpoint )
.toTable( bronze_table )
...
we just give Auto Loader a directory (a folder in a Volume) to store it's housekeeping information. While we as PySpark programmers can't peek inside the checkpoint, nor modify it, we can conceptually think of the checkpoint containing a table of the form
File path | Modification time |
/Volumes/bronze/default/raw/json/1394.json | 2024-06-01 11:20:00 |
/Volumes/bronze/default/raw/json/1395.json | 2024-06-02 07:55:25 |
/Volumes/bronze/default/raw/json/1396.json | 2024-06-03 20:10:09 |
If we were to re-run our pipeline code, Auto Loader would check each file against the checkpoint and see that we've already ingested 1396.json once, and will refuse to ingest it again.
Now that we've articulated the problem, and given some background on Auto Loader...what's the solution?
Well, no matter what, we'll have to update our table schema and fix our Spark code to take in to account the new field
ALTER TABLE bronze.default.raw ADD COLUMN (activity string AFTER finish_at)
source_dir = "/Volumes/bronze/default/raw/json"
checkpoint = "/Volumes/bronze/default/raw/checkpoint"
bronze_table = "bronze.default.raw"
df = (
spark.readStream
.format( "cloudFiles" )
.option( "cloudFiles.format", "json" )
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, activity string, distance decimal(10,4)" )
.load( source_dir )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.writeStream
.option( "checkpointLocation", checkpoint )
.toTable( bronze_table )
)
But what about the data that we already ingested? That 1396.json file that has the activity field -- we want to re-ingest that somehow.
We have a couple options
Fortunately, removing the offending rows is easy to do, because we embedded the file name and file modification time into our bronze table. Lookup the timestamp of the first file that contains this extra field and delete all the rows with that timestamp (or newer)
DELETE FROM bronze.default.raw WHERE _FILE_MODIFICATION_TIME >= '2024-06-03 20:10:09'
This is where things can get tricky. As mentioned above, Auto Loader really, really, really wants to ingest a file once and only once. So, what can we do? We have a few options
Let's dive deeper into each of these.
IMO, this is the preferred option. It is simple and less risky than the other two. In this approach, we do two things
We don't even need to bother with Spark Structured Streaming, we can just use Spark and write to the same Delta table bronze.default.raw that our streaming pipeline is writing to.
df = (
spark.read
.format( "json" )
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, activity string, distance decimal(10,4)" )
.load( "/Volumes/bronze/default/raw/json/1396.json" )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.write
.mode( "append" )
.saveAsTable( "bronze.default.raw" )
)
We can just do a one-off execution of code like the above in a stand-alone notebook. Or write a one-off job to run it.
The main point here is that we don't have to change anything about our main pipeline.
The one thing that feels a little icky about the previous approach is that we are duplicating ingest code/logic. We still have our main pipeline using Auto Loader, then we create nearly duplicate logic/code for the one-off "fixup". Wouldn't it be nice if we could somehow tell Auto Loader to just re-ingest some group of files?
In our previous section describing Auto Loader and Checkpoints, we highlight that when it comes to Auto Loader deciding if it's already ingested a file already, Auto Loader only looks at the file's path and ignores the modification time. What if we could:
this way, once we update those file modification times, Auto Loader will just see them as "new" files and (re)ingest them.
It turns out that #1 is easy, but #2 is actually difficult.
Auto Loader has an option named cloudFiles.allowOverwrites, which is False by default. If we set this option to True, then Auto Loader will look at each file's modification time and if it's newer than what stored in the Checkpoint, it will (re)ingest the file.
For example, we could just change our main pipeline to
source_dir = "/Volumes/bronze/default/raw/json"
checkpoint = "/Volumes/bronze/default/raw/checkpoint"
bronze_table = "bronze.default.raw"
df = (
spark.readStream
.format( "cloudFiles" )
.option( "cloudFiles.format", "json" )
.option( "cloudFiles.allowOverwrites", True )
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, activity string, distance decimal(10,4)" )
.load( source_dir )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.writeStream
.option( "checkpointLocation", checkpoint )
.toTable( bronze_table )
)
Great! So, how do we update the modification time on /Volumes/bronze/default/raw/json/1396.json so that Auto Loader will automatically re-ingest it? Does our Cloud Provider (e.g. AWS) have the equivalent of the Unix touch command that can update the modification time on an existing file? Unfortunately: no.
Turns out that they only real way to "update" a modification time is to
So, unless you can find some weird trick/hack from the cloud provider to update modification times without having to pay the cost of copying the file (or downloading and reuploading), this is probably not a very feasible option.
Also, there are some esoteric corner cases that can arise if you are using allowOverwrites with Auto Loader's file notification mode.
The third approach is more-or-less a version of the first, but still using Auto Loader. You could temporarily point Auto Loader to a new (and temporary) Checkpoint, then use the modifiedAfter (and maybe modifiedBefore) options to "trick" Auto Loader into re-ingesting our desired files.
This has the same downside as the first approach in that you'd be duplicating the ingest logic/code again; and the only real benefit is that you wouldn't have to build a manifest of the files to re-ingest, you would just give Auto Loader start/end timestamps and it would do the rest.
It would look something like
source_dir = "/Volumes/bronze/default/raw/json"
checkpoint_reingest = "/Volumes/bronze/default/raw/checkpoint_reingest"
bronze_table = "bronze.default.raw"
df = (
spark.readStream
.format( "cloudFiles" )
.option( "cloudFiles.format", "json" )
.option( "modifiedAfter", "2024-06-02 07:55:25" ) # before the 1396.json file
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, activity string, distance decimal(10,4)" )
.load( source_dir )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.writeStream
.option( "checkpointLocation", checkpoint_reingest )
.toTable( bronze_table )
)
Note that modifiedAfter means after, so you'd have to pick a date+time before first file you want to re-ingest.
Auto Loader is a powerful tool for automatically ingesting data from files in Cloud Storage. But sometimes bad data gets ingested and we want to essentially "back-up, fix the problem, then continue on". Today, Auto Loader doesn't have any built-in capabilities to back-up, fix the problem, then continue on, but we've outlined a couple different approaches to dealing with this situation.
Storing the file path and modification time in the Bronze table is absolutely critical, and something you should be doing no matter what.
After that, the simplest and most straightforward approach is to do a one-off, manual re-ingest of the files. It's up to you weigh the trade-offs and choose what's right for you and your situation.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.