Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-15-2024 05:09 PM
@Vladif1 The error occurs because the cloudFiles format in Auto Loader is meant for reading raw file formats like CSV, JSON ... for ingestion for more Format Support. For Delta tables, you should use the Delta format directly.
#Sample Example
bronze_path = "/mnt/bronze_layer"
silver_path = "/mnt/silver_layer"
raw_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{bronze_path}/_schema_checkpoint")
.load("/mnt/raw_data_path")
)
(raw_df
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", f"{bronze_path}/_checkpoint")
.start(bronze_path)
)
bronze_df = (
spark.readStream
.format("delta") # Delta format for reading
.load(bronze_path) # Path to Bronze Delta table
)
# Perform any necessary transformations for the Silver layer.
silver_df = bronze_df.withColumn("processed_timestamp", current_timestamp())
# Write the transformed data to the Silver layer
(silver_df
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", f"{silver_path}/_checkpoint")
.start(silver_path)
)