cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Error when reading delta lake files with Auto Loader

Vladif1
New Contributor II

Hi,

When reading Delta Lake file (created by Auto Loader) with this code: df = (

   spark.readStream

   .format('cloudFiles')

   .option("cloudFiles.format", "delta")

   .option("cloudFiles.schemaLocation", f"{silver_path}/_checkpoint")

   .load(bronze_path)

    )

Receives this error:

AnalysisException: Incompatible format detected. A transaction log for Delta was found at `/mnt/f1/f2/_delta_log`, but you are trying to read from `/mnt/f1/f2/` using format("cloudFiles"). You must use 'format("delta")' when reading and writing to a delta table. To disable this check, SET spark.databricks.delta.formatCheck.enabled=false To learn more about Delta...

What's right way of reading Delta Lake files with Auto Loader for further processing (e.g.. from Bronze layer to Silver)?

Thank you!

8 REPLIES 8

-werners-
Esteemed Contributor III

As the error mentions: autoloader and delta do not mix.

but there is change data feed on delta lake (as a source):

https://learn.microsoft.com/en-us/azure/databricks/delta/delta-change-data-feed

Like that you do not have to read the whole delta table but only ingest changes.

Vladif1
New Contributor II

Autoloader doesn't support reading from Delta Lake tables? any other format is supported except delta?

Thank you!

-werners-
Esteemed Contributor III

you can check for yourself:

https://learn.microsoft.com/en-us/azure/databricks/ingestion/auto-loader/

"Auto Loader can ingest JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats"

And it makes sense. Autoloader is a tool to identify what you have already processed.

Delta lake is more than just some files, it has a transaction log.

Anonymous
Not applicable

Hi @Vlad Feiginโ€‹ 

Hope everything is going great.

Just wanted to check in if you were able to resolve your issue. If yes, would you be happy to mark an answer as best so that other members can find the solution more quickly? If not, please tell us so we can help you. 

Cheers!

-werners-
Esteemed Contributor III

Autoloader can't read delta lake.
To use Delta Lake one can use Change Data Feed (with or without streaming).
https://docs.databricks.com/en/ingestion/cloud-object-storage/auto-loader/options.html
And it makes sense: one needs to process the delta lake log to know what files contain the actual data and read deletion vectors.

Saqlain12
New Contributor II

The Delta Executor is a powerful tool designed to streamline the execution of data processing tasks in cloud environments. It enhances performance by optimizing resource utilization and provides a flexible framework for managing complex workflows. With its support for various data formats and integration with popular data storage solutions, users can easily implement scalable solutions. Additionally, the Delta Executor ensures data consistency and reliability through transaction support. This makes it an essential component for modern data engineering and analytics pipelines.

Johni1
New Contributor II

Panda
Valued Contributor

@Vladif1 The error occurs because the cloudFiles format in Auto Loader is meant for reading raw file formats like CSV, JSON ... for ingestion for more Format Support. For Delta tables, you should use the Delta format directly.

 

#Sample Example

bronze_path = "/mnt/bronze_layer"
silver_path = "/mnt/silver_layer"

raw_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{bronze_path}/_schema_checkpoint")
    .load("/mnt/raw_data_path")
)

(raw_df
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{bronze_path}/_checkpoint")
    .start(bronze_path)
)

bronze_df = (
    spark.readStream
    .format("delta")  # Delta format for reading
    .load(bronze_path)  # Path to Bronze Delta table
)

# Perform any necessary transformations for the Silver layer.

silver_df = bronze_df.withColumn("processed_timestamp", current_timestamp())

# Write the transformed data to the Silver layer
(silver_df
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{silver_path}/_checkpoint")
    .start(silver_path)
)

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group