Hi @SantiNath_Dey,
Good question. This is a pretty common pattern, and yes — Auto Loader rescue mode is a strong fit for it. The cleanest way to think about the solution is in three parts: ingest safely, detect drift, and surface it through workflow failure notifications.
Step 1: Use Auto Loader in rescue mode
The key setting here is:
cloudFiles.schemaEvolutionMode = "rescue"
That tells Auto Loader not to evolve the schema and not to fail the stream when it encounters unexpected fields, type changes, or precision mismatches. Instead, anything that does not conform to the schema you provided gets captured in the _rescued_data column as JSON.
from pyspark.sql.types import StructType
input_path = "s3://.../landing/json"
bronze_table = "raw.bronze_events"
checkpoint_path = "s3://.../chk/autoloader/json"
schema = StructType([
# Define your expected nested JSON schema here
])
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path + "/schema")
.option("cloudFiles.schemaEvolutionMode", "rescue")
.schema(schema)
.load(input_path)
)
Since you are working with complex nested JSON, I would also strongly consider cloudFiles.schemaHints for any critical nested fields. On deeply nested structures, first-pass inference can get a little unpredictable, and schema hints help you lock down the parts that matter most.
Step 2: Detect schema drift and write it to a Delta table
From there, use foreachBatch to inspect each micro-batch for rescued rows. If _rescued_data is populated, that is effectively your drift signal. You can log those records to a dedicated Delta table with a timestamp and file metadata so you have a proper audit trail.
from pyspark.sql.functions import current_timestamp, col
drift_table = "quality.schema_drift_events"
def record_drift(batch_df, batch_id):
# First write the full batch to Bronze
batch_df.write.format("delta").mode("append").saveAsTable(bronze_table)
# Then isolate drifted rows
drift_df = (
batch_df
.filter(col("_rescued_data").isNotNull())
.select(
current_timestamp().alias("detected_at"),
col("_metadata.file_name").alias("file_name"),
col("_rescued_data")
)
)
if drift_df.count() > 0:
drift_df.write.format("delta").mode("append").saveAsTable(drift_table)
raise Exception(
f"Schema drift detected in {drift_df.count()} records. "
f"See table {drift_table} for full details."
)
(
df.writeStream
.foreachBatch(record_drift)
.option("checkpointLocation", checkpoint_path + "/stream")
.trigger(availableNow=True)
.start()
)
That intentional raise Exception is doing two useful things at once:
-
It persists the drift details to a Delta table
-
It forces the Workflow task to fail, which becomes the trigger for notification
Step 3: Let Databricks Workflows handle the email alert
Once the task fails, Databricks Workflows can take over and send the notification. You do not get total control over the email body, but in this case that is usually fine.
The failure email and the job run details will include the exception message, something like:
“Schema drift detected in N records. See table quality.schema_drift_events for full details.”
That gives the team the signal immediately, while the actual detail — what drifted, from which file, and when — is preserved in the Delta log table for investigation.
So the setup becomes pretty straightforward:
-
build the ingestion as a Workflow task
-
configure email notifications on task failure
-
point the alerts to the appropriate DL or owner group
A few practical callouts
-
The .count() inside foreachBatch is perfectly reasonable at moderate scale. If you are dealing with very large micro-batches, you may want to log first and let a lightweight downstream step determine whether new drift records were written.
-
If later on you decide you want to accept new columns automatically but still catch type or precision mismatches, then schemaEvolutionMode = "addNewColumns" plus a rescued data column is worth looking at. That is a different operating model, but sometimes the right one as pipelines mature.
-
_rescued_data works well for capturing drift, but deeply nested structures can still produce edge cases. That is another reason I like pairing rescue mode with a well-defined schema and targeted schema hints instead of relying too heavily on inference.
Hope that helps. Let us know how it goes.