- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Data lakes notoriously have had challenges with managing incremental data processing at scale without integrating open table storage format frameworks (i.e. Delta Lake, Apache Iceberg, Apache Hudi). In addition, schema management is difficult with schema-less data and schema-on-read methods. With the power of Databricks Lakehouse, Delta Lake and Apache Spark provide the essential technologies integrated with Databricks Auto Loader (AL) to consistently and reliably stream and process raw data formats incrementally, while maintaining stellar performance and data governance.
Auto Loader features
AL is a boost over Spark Structured Streaming, supporting several additional benefits and solutions including:
- Databricks Runtime only Structured Streaming cloudFiles source
- Schema drift, dynamic inference, and evolution support
- Ingests data via JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE input file formats
- Integration with cloud file notification services (i.e. Amazon SQS/SNS)
- Optimizes directory list mode scanning performance to discover new files in cloud storage (i.e. AWS, Azure, GCP, DBFS)
For further information please visit the official Databricks Auto Loader documentation.
Schema change scenarios
In this blog I will showcase a few examples of how AL handles schema management and drift scenarios using a public IoT sample dataset with schema modifications to showcase solutions. Schema 1 will contain an IoT sample dataset schema with all expected columns and expected data types. Schema 2 will contain unexpected changes to the IoT sample dataset schema with new columns and changed data types. The following variables and paths will be used for this demonstration along with Databricks Widgets to set your username folder.
%scala
dbutils.widgets.text("dbfs_user_dir", "your_user_name") // widget for account email
val userName = dbutils.widgets.get("dbfs_user_dir")
val rawBasePath = s"dbfs:/user/$userName/raw/"
val repoBasePath = s"dbfs:/user/$userName/repo/"
val jsonSchema1Path = rawBasePath + "iot-schema-1.json"
val jsonSchema2Path = rawBasePath + "iot-schema-2.json"
val repoSchemaPath = repoBasePath + "iot-ddl.json"
dbutils.fs.rm(repoSchemaPath, true) // remove schema repo for demos
Schema 1
%scala
spark.read.json(jsonSchema1Path).printSchema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- cca2: string (nullable = true)
|-- cca3: string (nullable = true)
|-- cn: string (nullable = true)
|-- coordinates: struct (nullable = true)
| |-- latitude: double (nullable = true)
| |-- longitude: double (nullable = true)
|-- date: string (nullable = true)
|-- device_id: long (nullable = true)
|-- device_serial_number: string (nullable = true)
|-- device_type: string (nullable = true)
|-- epoch_time_miliseconds: long (nullable = true)
|-- humidity: long (nullable = true)
|-- ip: string (nullable = true)
|-- scale: string (nullable = true)
|-- temp: double (nullable = true)
|-- timestamp: string (nullable = true)
%scala
display(spark.read.json(jsonSchema1Path).limit(10))
Schema 2
%scala
// NEW => device_serial_number_device_type, location
spark.read.json(jsonSchema2Path).printSchema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- date: string (nullable = true)
|-- device_id: long (nullable = true)
|-- device_serial_number_device_type: string (nullable = true)
|-- epoch_time_miliseconds: long (nullable = true)
|-- humidity: double (nullable = true)
|-- ip: string (nullable = true)
|-- latitude: double (nullable = true)
|-- location: struct (nullable = true)
| |-- cca2: string (nullable = true)
| |-- cca3: string (nullable = true)
| |-- cn: string (nullable = true)
|-- longitude: double (nullable = true)
|-- scale: string (nullable = true)
|-- temp: double (nullable = true)
|-- timestamp: string (nullable = true)
%scala
display(spark.read.json(jsonSchema2Path).limit(10))
Example 1: Schema Tracking/Management
AL tracks schema versions, metadata, and changes to input data over time via specifying a location directory path. These features are incredibly useful for tracking history of data lineage, and are tightly integrated with the Delta Lake transactional log DESCRIBE HISTORY and time travel.
%scala
val rawAlDf = (spark
.readStream.format("cloudfiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", repoSchemaPath) // schema history tracking
.load(jsonSchema1Path)
)
%scala
rawAlDf.printSchema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: string (nullable = true)
|-- c02_level: string (nullable = true)
|-- cca2: string (nullable = true)
|-- cca3: string (nullable = true)
|-- cn: string (nullable = true)
|-- coordinates: string (nullable = true)
|-- date: string (nullable = true)
|-- device_id: string (nullable = true)
|-- device_serial_number: string (nullable = true)
|-- device_type: string (nullable = true)
|-- epoch_time_miliseconds: string (nullable = true)
|-- humidity: string (nullable = true)
|-- ip: string (nullable = true)
|-- scale: string (nullable = true)
|-- temp: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)
%scala
display(rawAlDf.limit(10))
By default (for JSON, CSV, and XML file format) AL infers all column data types as strings, including nested fields.
Here is the directory structure where AL stores schema versions. These files can be read via Spark DataFrame API.
Schema Repository
%scala
display(dbutils.fs.ls(repoSchemaPath + "/_schemas"))
Schema Metadata
%scala
display(spark.read.json(repoSchemaPath + "/_schemas"))
Example 2: Schema Hints
AL provides hint logic using SQL DDL syntax to enforce and override dynamic schema inference on known single data types, as well as semi-structured complex data types.
%scala
val hintAlDf = (spark
.readStream.format("cloudfiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", repoSchemaPath)
.option("cloudFiles.schemaHints", "coordinates STRUCT<latitude:DOUBLE, longitude:DOUBLE>, humidity LONG, temp DOUBLE") // schema ddl hints
.load(jsonSchema1Path)
)
%scala
hintAlDf.printSchema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: string (nullable = true)
|-- c02_level: string (nullable = true)
|-- cca2: string (nullable = true)
|-- cca3: string (nullable = true)
|-- cn: string (nullable = true)
|-- coordinates: struct (nullable = true)
| |-- latitude: double (nullable = true)
| |-- longitude: double (nullable = true)
|-- date: string (nullable = true)
|-- device_id: string (nullable = true)
|-- device_serial_number: string (nullable = true)
|-- device_type: string (nullable = true)
|-- epoch_time_miliseconds: string (nullable = true)
|-- humidity: long (nullable = true)
|-- ip: string (nullable = true)
|-- scale: string (nullable = true)
|-- temp: double (nullable = true)
|-- timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)
The schema hints specified in the AL options perform the data type mappings on the respective columns. Hints are useful for applying schema enforcement on portions of the schema where data types are known while in tandem with dynamic schema inference covered in Example 3.
%scala
display(hintAlDf.limit(10))
Example 3: Dynamic Schema Inference
AL dynamically searches a sample of the dataset to determine nested structure. This avoids costly and slow full dataset scans to infer schema. The following configurations are available to adjust the amount of sample data used on read to discover initial schema:
- spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes (default 50 GB)
- spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles (default 1000 files)
%scala
val inferAlDf = (spark
.readStream.format("cloudfiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", repoSchemaPath)
.option("cloudFiles.inferColumnTypes", true) // schema inference
.load(jsonSchema1Path)
)
%scala
inferAlDf.printSchema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- cca2: string (nullable = true)
|-- cca3: string (nullable = true)
|-- cn: string (nullable = true)
|-- coordinates: struct (nullable = true)
| |-- latitude: double (nullable = true)
| |-- longitude: double (nullable = true)
|-- date: string (nullable = true)
|-- device_id: long (nullable = true)
|-- device_serial_number: string (nullable = true)
|-- device_type: string (nullable = true)
|-- epoch_time_miliseconds: long (nullable = true)
|-- humidity: long (nullable = true)
|-- ip: string (nullable = true)
|-- scale: string (nullable = true)
|-- temp: double (nullable = true)
|-- timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)
AL saves the initial schema to the schema location path provided. This schema serves as the base version for the stream during incremental processing. Dynamic schema inference is an automated approach to applying schema changes over time.
%scala
display(inferAlDf.limit(10))
Example 4: Static User-Defined Schema
AL also supports static custom schemas just like Spark Structured Streaming. This eliminates the need for dynamic schema-on-read inference scans, which trigger additional Spark jobs and schema versions. The schema can be retrieved as a DDL string or a JSON payload.
DDL
%scala
inferAlDf.schema.toDDL
String = alarm_status STRING,battery_level BIGINT,c02_level BIGINT,cca2 STRING,cca3 STRING,cn STRING,coordinates STRUCT<latitude: DOUBLE, longitude: DOUBLE>,date STRING,device_id BIGINT,device_serial_number STRING,device_type STRING,epoch_time_miliseconds BIGINT,humidity BIGINT,ip STRING,scale STRING,temp DOUBLE,timestamp STRING,_rescued_data STRING
JSON
%scala
spark.read.json(repoSchemaPath + "/_schemas").select("dataSchemaJson").where("dataSchemaJson is not null").first()
org.apache.spark.sql.Row = [{"type":"struct","fields":[{"name":"alarm_status","type":"string","nullable":true,"metadata":{}},{"name":"battery_level","type":"long","nullable":true,"metadata":{}},{"name":"c02_level","type":"long","nullable":true,"metadata":{}},{"name":"cca2","type":"string","nullable":true,"metadata":{}},{"name":"cca3","type":"string","nullable":true,"metadata":{}},{"name":"cn","type":"string","nullable":true,"metadata":{}},{"name":"coordinates","type":{"type":"struct","fields":[{"name":"latitude","type":"double","nullable":true,"metadata":{}},{"name":"longitude","type":"double","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"date","type":"string","nullable":true,"metadata":{}},{"name":"device_id","type":"long","nullable":true,"metadata":{}},{"name":"device_serial_number","type":"string","nullable":true,"metadata":{}},{"name":"device_type","type":"string","nullable":true,"metadata":{}},{"name":"epoch_time_miliseconds","type":"long","nullable":true,"metadata":{}},{"name":"humidity","type":"long","nullable":true,"metadata":{}},{"name":"ip","type":"string","nullable":true,"metadata":{}},{"name":"scale","type":"string","nullable":true,"metadata":{}},{"name":"temp","type":"double","nullable":true,"metadata":{}},{"name":"timestamp","type":"string","nullable":true,"metadata":{}}]}]
Here’s an example of how to generate a user-defined StructType (Scala) | StructType (Python) via DDL dataframe command or JSON queried from AL schema repository.
%scala
import org.apache.spark.sql.types.{DataType, StructType}
val ddl = """alarm_status STRING,battery_level BIGINT,c02_level BIGINT,cca2 STRING,cca3 STRING,cn STRING,coordinates STRUCT<latitude: DOUBLE, longitude: DOUBLE>,date STRING,device_id BIGINT,device_serial_number STRING,device_type STRING,epoch_time_miliseconds BIGINT,humidity BIGINT,ip STRING,scale STRING,temp DOUBLE,timestamp STRING,_rescued_data STRING"""
val ddlSchema = StructType.fromDDL(ddl)
val json = """{"type":"struct","fields":[{"name":"alarm_status","type":"string","nullable":true,"metadata":{}},{"name":"battery_level","type":"long","nullable":true,"metadata":{}},{"name":"c02_level","type":"long","nullable":true,"metadata":{}},{"name":"cca2","type":"string","nullable":true,"metadata":{}},{"name":"cca3","type":"string","nullable":true,"metadata":{}},{"name":"cn","type":"string","nullable":true,"metadata":{}},{"name":"coordinates","type":{"type":"struct","fields":[{"name":"latitude","type":"double","nullable":true,"metadata":{}},{"name":"longitude","type":"double","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"date","type":"string","nullable":true,"metadata":{}},{"name":"device_id","type":"long","nullable":true,"metadata":{}},{"name":"device_serial_number","type":"string","nullable":true,"metadata":{}},{"name":"device_type","type":"string","nullable":true,"metadata":{}},{"name":"epoch_time_miliseconds","type":"long","nullable":true,"metadata":{}},{"name":"humidity","type":"long","nullable":true,"metadata":{}},{"name":"ip","type":"string","nullable":true,"metadata":{}},{"name":"scale","type":"string","nullable":true,"metadata":{}},{"name":"temp","type":"double","nullable":true,"metadata":{}},{"name":"timestamp","type":"string","nullable":true,"metadata":{}}]}"""
val jsonSchema = DataType.fromJson(json).asInstanceOf[StructType]
%scala
val schemaAlDf = (spark
.readStream.format("cloudfiles")
.option("cloudFiles.format", "json")
.schema(jsonSchema) // schema structtype definition
.load(jsonSchema1Path)
)
%scala
schemaAlDf.printSchema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- cca2: string (nullable = true)
|-- cca3: string (nullable = true)
|-- cn: string (nullable = true)
|-- coordinates: struct (nullable = true)
| |-- latitude: double (nullable = true)
| |-- longitude: double (nullable = true)
|-- date: string (nullable = true)
|-- device_id: long (nullable = true)
|-- device_serial_number: string (nullable = true)
|-- device_type: string (nullable = true)
|-- epoch_time_miliseconds: long (nullable = true)
|-- humidity: long (nullable = true)
|-- ip: string (nullable = true)
|-- scale: string (nullable = true)
|-- temp: double (nullable = true)
|-- timestamp: string (nullable = true)
Passing in the schema definition will enforce the stream. AL also provides a schema enforcement option achieving basically the same results as providing a static StructType schema-on-read. This method will be covered in Example 7.
%scala
display(schemaAlDf.limit(10))
Example 5: Schema Drift
AL stores new columns and data types via the rescue column. This column captures schema changes-on-read. The stream does not fail when schema and data type mismatches are discovered. This is a very impressive feature!
%scala
val driftAlDf = (spark
.readStream.format("cloudfiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", repoSchemaPath)
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaEvolutionMode", "rescue") // schema drift tracking
.load(rawBasePath + "/*.json")
)
%scala
driftAlDf.printSchema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- cca2: string (nullable = true)
|-- cca3: string (nullable = true)
|-- cn: string (nullable = true)
|-- coordinates: struct (nullable = true)
| |-- latitude: double (nullable = true)
| |-- longitude: double (nullable = true)
|-- date: string (nullable = true)
|-- device_id: long (nullable = true)
|-- device_serial_number: string (nullable = true)
|-- device_type: string (nullable = true)
|-- epoch_time_miliseconds: long (nullable = true)
|-- humidity: long (nullable = true)
|-- ip: string (nullable = true)
|-- scale: string (nullable = true)
|-- temp: double (nullable = true)
|-- timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)
The rescue column preserves schema drift such as newly appended columns and or different data types via a JSON string payload. This payload can be parsed via Spark DataFrame or DataSet APIs to analyze schema drift scenarios. The source file path for each individual row is also available in the rescue column to investigate the root cause.
%scala
display(driftAlDf.where("_rescued_data is not null").limit(10))
Example 6: Schema Evolution
AL merges schemas as new columns arrive via schema evolution mode. New schema JSON will be updated and stored as a new version in the specified schema repository location.
%scala
val evolveAlDf = (spark
.readStream.format("cloudfiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", repoSchemaPath)
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") // schema evolution
.load(rawBasePath + "/*.json")
)
%scala
evolveAlDf.printSchema // original schema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- cca2: string (nullable = true)
|-- cca3: string (nullable = true)
|-- cn: string (nullable = true)
|-- coordinates: struct (nullable = true)
| |-- latitude: double (nullable = true)
| |-- longitude: double (nullable = true)
|-- date: string (nullable = true)
|-- device_id: long (nullable = true)
|-- device_serial_number: string (nullable = true)
|-- device_type: string (nullable = true)
|-- epoch_time_miliseconds: long (nullable = true)
|-- humidity: long (nullable = true)
|-- ip: string (nullable = true)
|-- scale: string (nullable = true)
|-- temp: double (nullable = true)
|-- timestamp: string (nullable = true)
|-- _rescued_data: string (nullable = true)
%scala
display(evolveAlDf.limit(10)) // # stream will fail
AL purposely fails the stream with an UnknownFieldException error when it detects a schema change via dynamic schema inference. The updated schema instance is created as a new version and metadata file in the schema repository location, and will be used against the input data after restarting the stream.
%scala
val evolveAlDf = (spark
.readStream.format("cloudfiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", repoSchemaPath)
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaHints", "humidity DOUBLE")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") // schema evolution
.load(rawBasePath + "/*.json")
)
%scala
evolveAlDf.printSchema // evolved schema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- cca2: string (nullable = true)
|-- cca3: string (nullable = true)
|-- cn: string (nullable = true)
|-- coordinates: struct (nullable = true)
| |-- latitude: double (nullable = true)
| |-- longitude: double (nullable = true)
|-- date: string (nullable = true)
|-- device_id: long (nullable = true)
|-- device_serial_number: string (nullable = true)
|-- device_type: string (nullable = true)
|-- epoch_time_miliseconds: long (nullable = true)
|-- humidity: double (nullable = true)
|-- ip: string (nullable = true)
|-- scale: string (nullable = true)
|-- temp: double (nullable = true)
|-- timestamp: string (nullable = true)
|-- device_serial_number_device_type: string (nullable = true)
|-- latitude: double (nullable = true)
|-- location: struct (nullable = true)
| |-- cca2: string (nullable = true)
| |-- cca3: string (nullable = true)
| |-- cn: string (nullable = true)
|-- longitude: double (nullable = true)
|-- _rescued_data: string (nullable = true)
AL has evolved the schema to merge the newly acquired data fields.
%scala
display(evolveAlDf.where("device_serial_number_device_type is not null").limit(10))
The newly merged schema transformed by AL is stored in the original schema repository path as version 1 along with the base version 0 schema. This history is valuable for tracking changes to schema over time, as well as, quickly retrieving DDL on the fly for schema enforcement.
Schema Repository
%scala
display(dbutils.fs.ls(repoSchemaPath + "/_schemas"))
Schema Metadata
%scala
display(spark.read.json(repoSchemaPath + "/_schemas"))
Schema evolution can be a messy problem if frequent. With AL and Delta Lake it becomes easier and simpler to manage. Adding new columns is relatively straightforward as AL combined with Delta Lake uses schema evolution to append them to the existing schema. Note, the values for these columns will be NULL for data already processed. The greater challenge occurs when the data types change because there will be a type mismatch against the data already processed. Currently, the ‘safest’ approach is to perform a complete overwrite of the target delta table to refresh all data with the changed data type(s). Depending on the data volume this operation is also relatively straight forward if infrequent. However, if data types are changing daily/weekly then this operation is going to be very costly to re-process large data volumes. This can be an indication that the business needs to improve their data strategy.
Constantly changing schemas can be a sign of a weak data governance strategy and lack of communication with the data business owners. Ideally, organizations should have some kind of SLA for data acquisition and know the expected schema. Raw data stored in the landing zone should also follow some kind of pre-ETL strategy (i.e. ontology, taxonomy, partitioning) for better incremental loading performance into the Lakehouse. Skipping these steps can cause a plethora of data management issues that will negatively impact downstream consumers building data analytics, BI, and AI/ML pipelines & applications. If upstream schema and formatting issues are never addressed, downstream pipelines will consistently break and result in increased cloud storage and compute costs. Garbage in, garbage out.
Example 7: Schema Enforcement
AL validates data against the linked schema version stored in repository location via schema enforcement mode. Schema enforcement is a schema-on-write operation, and only ingested data matching the target Delta Lake schema will be written to output. Any future input schema changes will be ignored, and AL streams will continue working without failure. Schema enforcement is a very powerful feature of AL and Delta Lake. It ensures only clean and trusted data will be inserted into downstream Silver/Gold datasets used for data analytics, BI, and AI/ML pipelines & applications.
%scala
val enforceAlDf = (spark
.readStream.format("cloudfiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", repoSchemaPath)
.option("cloudFiles.schemaEvolutionMode", "none") // schema enforcement
.schema(jsonSchema)
.load(rawBasePath + "/*.json")
)
%scala
enforceAlDf.printSchema
root
|-- alarm_status: string (nullable = true)
|-- battery_level: long (nullable = true)
|-- c02_level: long (nullable = true)
|-- cca2: string (nullable = true)
|-- cca3: string (nullable = true)
|-- cn: string (nullable = true)
|-- coordinates: struct (nullable = true)
| |-- latitude: double (nullable = true)
| |-- longitude: double (nullable = true)
|-- date: string (nullable = true)
|-- device_id: long (nullable = true)
|-- device_serial_number: string (nullable = true)
|-- device_type: string (nullable = true)
|-- epoch_time_miliseconds: long (nullable = true)
|-- humidity: long (nullable = true)
|-- ip: string (nullable = true)
|-- scale: string (nullable = true)
|-- temp: double (nullable = true)
|-- timestamp: string (nullable = true)
Please note the rescue column is no longer available in this example because schema enforcement has been enabled. However, a rescue column can still be configured separately as an AL option if desired. In addition, schema enforcement mode uses the latest schema version in the repository to enforce incoming data. For older versions, set a user-defined schema as explained in Example 4.
%scala
display(enforceAlDf.limit(10))
Conclusion
At the end of the day, data issues are inevitable. However, the key is to limit data pollution as much as possible and have methods to detect discrepancies, changes, and history via schema management. Databricks Auto Loader provides many solutions for schema management, as illustrated by the examples in this blog. Having a solidified data governance and landing zone strategy will make ingestion and streaming easier and more efficient for loading data into the Lakehouse. Whether it is simply converting raw JSON data incrementally to the Bronze layer as Delta Lake format, or having a repository to store schema metadata, AL makes your job easier. It acts as an anchor to building a resilient Lakehouse architecture that provides reusable, consistent, reliable, and performant data throughout the Data+AI lifecycle.
Thank you for reading this blog. DBC file notebooks (Spark Scala & Spark Python) with code and zipped sample datasets can be found @ GitHub repo here. Disclaimer: Currently at the time of this blog, Databricks Community Edition, does not support Databricks Auto Loader.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.