Panda
Valued Contributor

@Vladif1 The error occurs because the cloudFiles format in Auto Loader is meant for reading raw file formats like CSV, JSON ... for ingestion for more Format Support. For Delta tables, you should use the Delta format directly.

 

#Sample Example

bronze_path = "/mnt/bronze_layer"
silver_path = "/mnt/silver_layer"

raw_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{bronze_path}/_schema_checkpoint")
    .load("/mnt/raw_data_path")
)

(raw_df
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{bronze_path}/_checkpoint")
    .start(bronze_path)
)

bronze_df = (
    spark.readStream
    .format("delta")  # Delta format for reading
    .load(bronze_path)  # Path to Bronze Delta table
)

# Perform any necessary transformations for the Silver layer.

silver_df = bronze_df.withColumn("processed_timestamp", current_timestamp())

# Write the transformed data to the Silver layer
(silver_df
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{silver_path}/_checkpoint")
    .start(silver_path)
)