cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Updating records with auto loader

databrickser
New Contributor

I want to ingest JSON files from an S3 bucket into a Databricks table using an autoloader.

A job runs every few hours to write the combined JSON data to the table.

Some records might be updates to existing records, identifiable by a specific key.

I want to update existing records if a new record with the same key arrives.

Can this be done with an autoloader? If not, what is the best approach to achieve this?

2 REPLIES 2

adriennn
Contributor II

you can use forEachBatch to write an arbitrary table. See this page for an example of running arbitrary SQL code during an autoloader/streaming run.

filipniziol
Contributor III

Hi @databrickser ,

In theory, it is possible to use Auto Loader with foreachBatch to update existing records.
The below code snippet show a working solution:

from datetime import datetime
from pyspark.sql import DataFrame
from pyspark.sql.functions import input_file_name, regexp_extract, col, lit, to_date
from pyspark.sql.streaming import DataStreamWriter
from delta.tables import DeltaTable  # Import DeltaTable

# Define the base path for the files
basePath = "abfss://landing@<>.dfs.core.windows.net"

# Define the path pattern to match all files
pathPattern = f"{basePath}/*/*.csv"

# Get today's date and format it
today_str = datetime.now().strftime("%Y-%m-%d")

# Load the data using AutoLoader
df = (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("cloudFiles.inferColumnTypes", "true")
      .option("cloudFiles.includeExistingFiles", "true")
      .option("cloudFiles.schemaLocation", f"{basePath}/_schema")
      .option("path", pathPattern)
      .load())

# Add a column to extract the full file path
df = df.withColumn("file_path", input_file_name())

# Extract the date using a regular expression
df = df.withColumn("file_date", regexp_extract(col("file_path"), r'(\d{4}-\d{2}-\d{2})', 1))

def merge_to_delta(microBatchDF: DataFrame, batchId: int):
    # Define the Delta table to merge into
    deltaTable = DeltaTable.forName(spark, "dev_catalogue.db_bronze.autoloader_merge")
    
    # Perform the merge operation
    (deltaTable.alias("target")
     .merge(
         microBatchDF.alias("source"),
         "target.Date = source.Date" 
     )
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute()
    )

# Use foreachBatch to apply the merge function
query = (filtered_df.writeStream
         .foreachBatch(merge_to_delta)
         .option("checkpointLocation", f"{basePath}/_checkpoints/autoloader_merge")
         .start())

On the other hand, I would recommend not to implement this like this and follow the standard Medallion Architecture pattern.  This will give you a better, more maintainable solution.

Medallion Architecture Approach:

Bronze Layer โ€“ Raw Data: Use Auto Loader to ingest your JSON files from S3 directly into a Bronze delta table without performing any transformations or updates. This layer should store the raw data.

bronze_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/path/to/bronze-schema")  # Schema location for Auto Loader
    .load("s3://your-bucket/path/to/bronze-files")
)

bronze_df.writeStream.format("delta").option("checkpointLocation", "/path/to/bronze-checkpoint").table("bronze_table")

Silver Layer โ€“ Cleaned Data: Once the raw data is ingested into the Bronze table, perform the necessary transformations and apply the MERGE operation between the Bronze and Silver tables. The Silver table will contain cleaned and merged data:

# Read data from the Bronze table
bronze_data = spark.read.format("delta").table("bronze_table")

# Perform the MERGE operation to update existing records or insert new ones in the Silver table
spark.sql("""
MERGE INTO silver_table AS silver
USING bronze_data AS bronze
ON silver.id = bronze.id
WHEN MATCHED THEN
  UPDATE SET silver.* = bronze.*
WHEN NOT MATCHED THEN
  INSERT *
""")

Why Use the Medallion Architecture?

Scalability and Maintainability: Separating raw data (Bronze), cleaned data (Silver), ensures that each layer serves a distinct purpose, making it easier to manage and scale your solution over time.

Flexibility for Future Enhancements: Itโ€™s easier to introduce new logic, transformations, or changes in schema by maintaining separate layers.

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group