cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Auto Loader

JissMathew
New Contributor II
from pyspark.sql.types import StructType, StructField, LongType, StringType, TimestampType
from pyspark.sql import functions as F, Window
from delta.tables import DeltaTable
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("BronzeLayerUpdater")

# Define schema for the incoming data
schema = StructType([
    StructField("ID", LongType(), False),
    StructField("Name", StringType(), False),
    StructField("Address", StringType(), False),
    StructField("date", StringType(), False)  # Initial string type for date
])

def update_bronze_layer(landing_folder_path, bronze_table, bronze_checkpoint_path😞
    logger.info(f"Ingesting data for {bronze_table} and building bronze layer...")

    try:
        # Read the streaming data with Auto Loader
        raw_stream = (spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.schemaLocation", f"{bronze_checkpoint_path}/schemas/{bronze_table}")
            .option("header", "true")
            .schema(schema)
            .option("cloudFiles.inferColumnTypes", "true")
            .load(landing_folder_path)
            .withColumn("file_name", F.input_file_name())
            .withColumn("date", F.to_timestamp("date", "M/d/yyyy"))  # Convert date to timestamp
            .withColumn("injection_date", F.current_timestamp())  # Add injection timestamp
        )

        # Process and upsert data to the Delta table
        (raw_stream.writeStream
            .foreachBatch(lambda batch_df, batch_id: merge_into_bronze_table(batch_df, bronze_table))
            .option("checkpointLocation", f"{bronze_checkpoint_path}/checkpoints/{bronze_table}")
            .trigger(availableNow=True)
            .outputMode("append")
            .start()
            .awaitTermination()
        )

    except Exception as e:
        logger.error(f"Error in update_bronze_layer: {e}")
        raise e

def merge_into_bronze_table(batch_df, bronze_table😞
    """
    Perform upserts into the Delta table using merge.
    """
    try:
        logger.info("Starting merge operation...")

        # Deduplicate source data to avoid multiple matches
        deduplicated_batch_df = (
            batch_df.withColumn("row_number", F.row_number().over(Window.partitionBy("ID").orderBy(F.desc("injection_date"))))
            .filter("row_number = 1")
            .drop("row_number")
        )

        # Check if the table exists
        if spark.catalog.tableExists(bronze_table):
            logger.info(f"Delta table {bronze_table} exists. Performing merge...")
            delta_table = DeltaTable.forName(spark, bronze_table)

            (delta_table.alias("target")
                .merge(deduplicated_batch_df.alias("source"), "target.ID = source.ID")
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute())

            logger.info("Merge operation completed successfully.")
        else:
            logger.warning(f"Delta table {bronze_table} does not exist. Creating a new table...")
            deduplicated_batch_df.write.format("delta").mode("overwrite").saveAsTable(bronze_table)
            logger.info("Delta table created successfully.")

    except Exception as e:
        logger.error(f"Error during merge operation: {e}")
        raise e
 
I recently tried the Auto Loader, and I hope this approach resolves my previous challenges. When implementing the Auto Loader, can we effectively capture data changes using merge logic? The code appears to be working successfully.
1 REPLY 1

RiyazAli
Valued Contributor

If you intend to capture data changes, take a look at this doc, which talks about change data feed in Databricks.

Riz

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group