from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
from pyspark.sql import functions as F, Window
from delta.tables import DeltaTable
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BronzeLayerUpdater")
# Define schema for the incoming data
schema = StructType([
StructField("ID", LongType(), False),
StructField("Name", StringType(), False),
StructField("Address", StringType(), False),
StructField("date", StringType(), False) # Initial string type for date
])
def update_bronze_layer(landing_folder_path, bronze_table, bronze_checkpoint_path😞
logger.info(f"Ingesting data for {bronze_table} and building bronze layer...")
try:
# Read the streaming data with Auto Loader
raw_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", f"{bronze_checkpoint_path}/schemas/{bronze_table}")
.option("header", "true")
.schema(schema)
.option("cloudFiles.inferColumnTypes", "true")
.load(landing_folder_path)
.withColumn("file_name", F.input_file_name())
.withColumn("date", F.to_timestamp("date", "M/d/yyyy")) # Convert date to timestamp
.withColumn("injection_date", F.current_timestamp()) # Add injection timestamp
)
# Process and upsert data to the Delta table
(raw_stream.writeStream
.foreachBatch(lambda batch_df, batch_id: merge_into_bronze_table(batch_df, bronze_table))
.option("checkpointLocation", f"{bronze_checkpoint_path}/checkpoints/{bronze_table}")
.trigger(availableNow=True)
.outputMode("append")
.start()
.awaitTermination()
)
except Exception as e:
logger.error(f"Error in update_bronze_layer: {e}")
raise e
def merge_into_bronze_table(batch_df, bronze_table😞
"""
Perform upserts into the Delta table using merge.
"""
try:
logger.info("Starting merge operation...")
# Deduplicate source data to avoid multiple matches
deduplicated_batch_df = (
batch_df.withColumn("row_number", F.row_number().over(Window.partitionBy("ID").orderBy(F.desc("injection_date"))))
.filter("row_number = 1")
.drop("row_number")
)
# Check if the table exists
if spark.catalog.tableExists(bronze_table):
logger.info(f"Delta table {bronze_table} exists. Performing merge...")
delta_table = DeltaTable.forName(spark, bronze_table)
(delta_table.alias("target")
.merge(deduplicated_batch_df.alias("source"), "target.ID = source.ID")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
logger.info("Merge operation completed successfully.")
else:
logger.warning(f"Delta table {bronze_table} does not exist. Creating a new table...")
deduplicated_batch_df.write.format("delta").mode("overwrite").saveAsTable(bronze_table)
logger.info("Delta table created successfully.")
except Exception as e:
logger.error(f"Error during merge operation: {e}")
raise e
I recently tried the Auto Loader, and I hope this approach resolves my previous challenges. When implementing the Auto Loader, can we effectively capture data changes using merge logic? The code appears to be working successfully.