cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
Aaron_Binns
Databricks Employee
Databricks Employee

So imagine, you are a Data Engineer using Databricks Auto Loader to ingest data from cloud storage, when you suddenly realize that recently ingested data has some problem and you’d like to simply delete it from the bronze layer and re-ingest it.  This blog post gives an example of that scenario, followed by a recommended solution, as well as noting alternative approaches, each with their drawbacks compared to the recommended solution.

Let’s consider a contrived example that highlights the key aspects of our scenario.

Suppose you are a Data Engineer at a new fitness watch company, and you setup an Auto Loader-based pipeline to ingest events that are coming in from people's fitness watches ever time the finish a run/jog.  These events are landing in JSON files and have the form:

{ "user_id": "1", 
"start_at": "2024-06-01 10:42:58",
"finish_at":"2024-06-01 11:38:45",
"distance": 6.5
}
 

and your (simplified) ingestion pipeline looks something like the following.  The following code is really just pseudocode, to illustrate the point, and not full PySpark code that you can copy/paste into your own notebook.

source_dir   = "/Volumes/bronze/default/raw/json"
checkpoint = "/Volumes/bronze/default/raw/checkpoint"
bronze_table = "bronze.default.raw"

df = (
spark.readStream
.format( "cloudFiles" )
.option( "cloudFiles.format", "json" )
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, distance decimal(10,4)" )
.load( source_dir )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.writeStream
.option( "checkpointLocation", checkpoint )
.toTable( bronze_table )
)

First note the use of the _metadata (File metadata) column.  Embedding the name of the source JSON file and its modification time is crucial to the solution we will outline below.  Generally, it is considered a best practice to embed file metadata information into your Bronze layer when ingesting from files in cloud storage, so that we can know which file each row of our raw data came from.

Our bronze.default.raw table looks something like

user_id start_at finish_at distance _FILE_PATH _FILE_MODIFICATION_TIME
1 2024-06-01 10:42:58 2024-06-01 11:38:45 6.5 1394.json 2024-06-01 11:20:00
2
2024-06-02 07:19:49
2024-06-02 07:55:19
4.25 1395.json 2024-06-02 07:55:25
3
2024-06-03 18:55:32
2024-06-03 20:10:04
30.6 1396.json 2024-06-03 20:10:09

And we are happy with how our ingest pipeline is running.

Problem

But, suppose we get an email the morning of 2024-06-03 from a data analyst asking about that new field that was rolled-out in a software update two days prior.  On 2024-06-02, a software update was pushed to all the watches and now those events have a new activity field to indicate what kind of fitness activity is being done.  For example, "running" or "cycling".   No one told Data Engineering ahead of time, so we likely have ingested some data that is missing this new field.

We talk to release engineering and get a date+time of when the new version was pushed, then we look at our raw JSON files and pinpoint when the change started appearing in the data. We find

Filename Modification Tme Payload
1395.json 2024-06-02 07:55:25 { "user_id": 2,
  "start_at": "2024-06-02 07:19:49",
  "finish_at": "2024-06-02 07:55:19",
  "distance": 4.25 }
1396.json 2024-06-03 20:10:09 { "user_id": 3,
  "start_at": "2024-06-03 18:55:32",
  "finish_at": "2024-06-03 20:10:04",
  "activity": "cycling",
  "distance": 30.6
}

And sure enough, in 1396.json the JSON payload contains that new activity field.  Now, while we can update our Bronze table schema to have this new field, we've already ingested file 1396.json without that field, so can we somehow tell Autoloader to re-ingest that file (and any others coming after it)?  We of course could delete the entire bronze table and re-ingest it completely from scratch, but we'd rather not.  Can we somehow just re-ingest these most recent files?

No...but also, yes.

Auto Loader and Checkpoint

One of Auto Loader's main selling-points is its "ingest once, and only once" behavior.  By default, Auto Loader will scan your cloud storage folder and ingest the files it finds there.  It also keeps track of which files have been ingested already, so that if it fails and restarts, it will not re-ingest all those files again.  Where does Auto Loader keep track of the files it's already ingested?  Checkpoint.

The checkpoint is specified in our code sample above

  ...
.writeStream
.option( "checkpointLocation", checkpoint )
.toTable( bronze_table )
...

we just give Auto Loader a directory (a folder in a Volume) to store it's housekeeping information.  While we as PySpark programmers can't peek inside the checkpoint, nor modify it, we can conceptually think of the checkpoint containing a table of the form

File path Modification time
/Volumes/bronze/default/raw/json/1394.json 2024-06-01 11:20:00
/Volumes/bronze/default/raw/json/1395.json 2024-06-02 07:55:25
/Volumes/bronze/default/raw/json/1396.json 2024-06-03 20:10:09

If we were to re-run our pipeline code, Auto Loader would check each file against the checkpoint and see that we've already ingested 1396.json once, and will refuse to ingest it again.

Solutions

Now that we've articulated the problem, and given some background on Auto Loader...what's the solution?

Well, no matter what, we'll have to update our table schema and fix our Spark code to take in to account the new field

ALTER TABLE bronze.default.raw ADD COLUMN (activity string AFTER finish_at)
source_dir = "/Volumes/bronze/default/raw/json"
checkpoint = "/Volumes/bronze/default/raw/checkpoint"
bronze_table = "bronze.default.raw"

df = (
spark.readStream
.format( "cloudFiles" )
.option( "cloudFiles.format", "json" )
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, activity string, distance decimal(10,4)" )
.load( source_dir )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.writeStream
.option( "checkpointLocation", checkpoint )
.toTable( bronze_table )
)

But what about the data that we already ingested?  That 1396.json file that has the activity field -- we want to re-ingest that somehow.

We have a couple options

  1. Do nothing
    Just accept that some of the data we ingested is missing the activity field and move on
    Not a great option, as we want that activity field correctly represented in our Lakehouse

  2. Re-ingest all the data
    We could simply delete the bronze.default.raw table, the associated checkpoint, and then re-run our pipeline to re-ingest all of our data again
    This is potentially a very expensive option, and just feels like overkill since all we want to do is re-ingest some relatively small portion of recent data

  3. Delete the incorrect data from bronze.default.raw and then re-ingest just the necessary files
    That's really what we want, so lets explore that in more detail!

Remove offending rows

Fortunately, removing the offending rows is easy to do, because we embedded the file name and file modification time into our bronze table.  Lookup the timestamp of the first file that contains this extra field and delete all the rows with that timestamp (or newer)

DELETE FROM bronze.default.raw WHERE _FILE_MODIFICATION_TIME >= '2024-06-03 20:10:09'

Re-Ingest Files

This is where things can get tricky.  As mentioned above, Auto Loader really, really, really wants to ingest a file once and only once.  So, what can we do?  We have a few options

  1. Leave Auto Loader alone and manually re-ingest the files in a one-off command/job
  2. Convince Auto Loader to look at file timestamps and re-ingest files with a new Modification Time
  3. Temporarily point Auto Loader to an "empty" checkpoint and re-ingest files newer than a certain date

Let's dive deeper into each of these.

Manual Re-ingest

IMO, this is the preferred option.  It is simple and less risky than the other two.  In this approach, we do two things

  1. Create a list of files we want to re-ingest
  2. Manually run nearly the same code as our Auto Loader pipeline to just re-ingest those files.

We don't even need to bother with Spark Structured Streaming, we can just use Spark and write to the same Delta table bronze.default.raw that our streaming pipeline is writing to.

df = (
spark.read
.format( "json" )
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, activity string, distance decimal(10,4)" )
.load( "/Volumes/bronze/default/raw/json/1396.json" )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.write
.mode( "append" )
.saveAsTable( "bronze.default.raw" )
)

We can just do a one-off execution of code like the above in a stand-alone notebook.  Or write a one-off job to run it.

The main point here is that we don't have to change anything about our main pipeline.

Auto Loader use timestamps

The one thing that feels a little icky about the previous approach is that we are duplicating ingest code/logic.  We still have our main pipeline using Auto Loader, then we create nearly duplicate logic/code for the one-off "fixup".  Wouldn't it be nice if we could somehow tell Auto Loader to just re-ingest some group of files?

In our previous section describing Auto Loader and Checkpoints, we highlight that when it comes to Auto Loader deciding if it's already ingested a file already, Auto Loader only looks at the file's path and ignores the modification time.  What if we could:

  1. Tell Auto Loader to look at both file path and the modification time, and if the file has a newer modification time than what's stored in the Checkpoint, then re-ingest the file
  2. Update the modification times on the files we want to re-ingest

this way, once we update those file modification times, Auto Loader will just see them as "new" files and (re)ingest them.

It turns out that #1 is easy, but #2 is actually difficult.

Auto Loader has an option named cloudFiles.allowOverwrites, which is False by default.  If we set this option to True, then Auto Loader will look at each file's modification time and if it's newer than what stored in the Checkpoint, it will (re)ingest the file.

For example, we could just change our main pipeline to

source_dir = "/Volumes/bronze/default/raw/json"
checkpoint = "/Volumes/bronze/default/raw/checkpoint"
bronze_table = "bronze.default.raw"

df = (
spark.readStream
.format( "cloudFiles" )
.option( "cloudFiles.format", "json" )
.option( "cloudFiles.allowOverwrites", True )
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, activity string, distance decimal(10,4)" )
.load( source_dir )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.writeStream
.option( "checkpointLocation", checkpoint )
.toTable( bronze_table )
)

Great!  So, how do we update the modification time on /Volumes/bronze/default/raw/json/1396.json so that Auto Loader will automatically re-ingest it?  Does our Cloud Provider (e.g. AWS) have the equivalent of the Unix touch command that can update the modification time on an existing file?  Unfortunately: no.

Turns out that they only real way to "update" a modification time is to

  • Download and re-upload the file
  • Copy it to another location, then copy it back to the original location
  • Other weird tricks from the cloud provider?

So, unless you can find some weird trick/hack from the cloud provider to update modification times without having to pay the cost of copying the file (or downloading and reuploading), this is probably not a very feasible option.

Also, there are some esoteric corner cases that can arise if you are using allowOverwrites with Auto Loader's file notification mode.

Temporary empty checkpoint

The third approach is more-or-less a version of the first, but still using Auto Loader.  You could temporarily point Auto Loader to a new (and temporary) Checkpoint, then use the modifiedAfter (and maybe modifiedBefore) options to "trick" Auto Loader into re-ingesting our desired files.

This has the same downside as the first approach in that you'd be duplicating the ingest logic/code again; and the only real benefit is that you wouldn't have to build a manifest of the files to re-ingest, you would just give Auto Loader start/end timestamps and it would do the rest.

It would look something like

source_dir = "/Volumes/bronze/default/raw/json"
checkpoint_reingest = "/Volumes/bronze/default/raw/checkpoint_reingest"
bronze_table = "bronze.default.raw"

df = (
spark.readStream
.format( "cloudFiles" )
.option( "cloudFiles.format", "json" )
.option( "modifiedAfter", "2024-06-02 07:55:25" ) # before the 1396.json file
.schema( "user_id bigint, start_at timestamp, finish_at timestamp, activity string, distance decimal(10,4)" )
.load( source_dir )
.select( "*",
col( "_metadata.file_path") .alias("_FILE_PATH" ),
col( "_metadata.file_modification_time").alias("_FILE_MODIFICATION_TIME" )
)
.writeStream
.option( "checkpointLocation", checkpoint_reingest )
.toTable( bronze_table )
)

Note that modifiedAfter means after, so you'd have to pick a date+time before first file you want to re-ingest.

Coda

Auto Loader is a powerful tool for automatically ingesting data from files in Cloud Storage.  But sometimes bad data gets ingested and we want to essentially "back-up, fix the problem, then continue on".  Today, Auto Loader doesn't have any built-in capabilities to back-up, fix the problem, then continue on, but we've outlined a couple different approaches to dealing with this situation.

Storing the file path and modification time in the Bronze table is absolutely critical, and something you should be doing no matter what.

After that, the simplest and most straightforward approach is to do a one-off, manual re-ingest of the files.  It's up to you weigh the trade-offs and choose what's right for you and your situation.