Auto Loader
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-03-2024 10:39 PM
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
from pyspark.sql import functions as F, Window
from delta.tables import DeltaTable
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BronzeLayerUpdater")
# Define schema for the incoming data
schema = StructType([
StructField("ID", LongType(), False),
StructField("Name", StringType(), False),
StructField("Address", StringType(), False),
StructField("date", StringType(), False) # Initial string type for date
])
def update_bronze_layer(landing_folder_path, bronze_table, bronze_checkpoint_path😞
logger.info(f"Ingesting data for {bronze_table} and building bronze layer...")
try:
# Read the streaming data with Auto Loader
raw_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", f"{bronze_checkpoint_path}/schemas/{bronze_table}")
.option("header", "true")
.schema(schema)
.option("cloudFiles.inferColumnTypes", "true")
.load(landing_folder_path)
.withColumn("file_name", F.input_file_name())
.withColumn("date", F.to_timestamp("date", "M/d/yyyy")) # Convert date to timestamp
.withColumn("injection_date", F.current_timestamp()) # Add injection timestamp
)
# Process and upsert data to the Delta table
(raw_stream.writeStream
.foreachBatch(lambda batch_df, batch_id: merge_into_bronze_table(batch_df, bronze_table))
.option("checkpointLocation", f"{bronze_checkpoint_path}/checkpoints/{bronze_table}")
.trigger(availableNow=True)
.outputMode("append")
.start()
.awaitTermination()
)
except Exception as e:
logger.error(f"Error in update_bronze_layer: {e}")
raise e
def merge_into_bronze_table(batch_df, bronze_table😞
"""
Perform upserts into the Delta table using merge.
"""
try:
logger.info("Starting merge operation...")
# Deduplicate source data to avoid multiple matches
deduplicated_batch_df = (
batch_df.withColumn("row_number", F.row_number().over(Window.partitionBy("ID").orderBy(F.desc("injection_date"))))
.filter("row_number = 1")
.drop("row_number")
)
# Check if the table exists
if spark.catalog.tableExists(bronze_table):
logger.info(f"Delta table {bronze_table} exists. Performing merge...")
delta_table = DeltaTable.forName(spark, bronze_table)
(delta_table.alias("target")
.merge(deduplicated_batch_df.alias("source"), "target.ID = source.ID")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
logger.info("Merge operation completed successfully.")
else:
logger.warning(f"Delta table {bronze_table} does not exist. Creating a new table...")
deduplicated_batch_df.write.format("delta").mode("overwrite").saveAsTable(bronze_table)
logger.info("Delta table created successfully.")
except Exception as e:
logger.error(f"Error during merge operation: {e}")
raise e
I recently tried the Auto Loader, and I hope this approach resolves my previous challenges. When implementing the Auto Loader, can we effectively capture data changes using merge logic? The code appears to be working successfully.
Jiss Mathew
India .
India .
1 REPLY 1
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-04-2024 12:32 AM
If you intend to capture data changes, take a look at this doc, which talks about change data feed in Databricks.
Riz

