<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Updating records with auto loader in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/updating-records-with-auto-loader/m-p/92479#M38446</link>
    <description>&lt;P&gt;you can use forEachBatch to write an arbitrary table. See&lt;A href="https://community.databricks.com/t5/data-engineering/foreachbatch/td-p/48981" target="_self"&gt; this page&lt;/A&gt; for an example of running arbitrary SQL code during an autoloader/streaming run.&lt;/P&gt;</description>
    <pubDate>Tue, 01 Oct 2024 17:09:34 GMT</pubDate>
    <dc:creator>adriennn</dc:creator>
    <dc:date>2024-10-01T17:09:34Z</dc:date>
    <item>
      <title>Updating records with auto loader</title>
      <link>https://community.databricks.com/t5/data-engineering/updating-records-with-auto-loader/m-p/92427#M38436</link>
      <description>&lt;P&gt;I want to ingest JSON files from an S3 bucket into a Databricks table using an autoloader.&lt;/P&gt;&lt;P&gt;A job runs every few hours to write the combined JSON data to the table.&lt;/P&gt;&lt;P&gt;Some records might be updates to existing records, identifiable by a specific key.&lt;/P&gt;&lt;P&gt;I want to update existing records if a new record with the same key arrives.&lt;/P&gt;&lt;P&gt;Can this be done with an autoloader? If not, what is the best approach to achieve this?&lt;/P&gt;</description>
      <pubDate>Tue, 01 Oct 2024 12:00:20 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/updating-records-with-auto-loader/m-p/92427#M38436</guid>
      <dc:creator>databrickser</dc:creator>
      <dc:date>2024-10-01T12:00:20Z</dc:date>
    </item>
    <item>
      <title>Re: Updating records with auto loader</title>
      <link>https://community.databricks.com/t5/data-engineering/updating-records-with-auto-loader/m-p/92479#M38446</link>
      <description>&lt;P&gt;you can use forEachBatch to write an arbitrary table. See&lt;A href="https://community.databricks.com/t5/data-engineering/foreachbatch/td-p/48981" target="_self"&gt; this page&lt;/A&gt; for an example of running arbitrary SQL code during an autoloader/streaming run.&lt;/P&gt;</description>
      <pubDate>Tue, 01 Oct 2024 17:09:34 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/updating-records-with-auto-loader/m-p/92479#M38446</guid>
      <dc:creator>adriennn</dc:creator>
      <dc:date>2024-10-01T17:09:34Z</dc:date>
    </item>
    <item>
      <title>Re: Updating records with auto loader</title>
      <link>https://community.databricks.com/t5/data-engineering/updating-records-with-auto-loader/m-p/92483#M38449</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/124051"&gt;@databrickser&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;In theory, it is possible to use Auto Loader with foreachBatch to update existing records.&lt;BR /&gt;The below code snippet show a working solution:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from datetime import datetime
from pyspark.sql import DataFrame
from pyspark.sql.functions import input_file_name, regexp_extract, col, lit, to_date
from pyspark.sql.streaming import DataStreamWriter
from delta.tables import DeltaTable  # Import DeltaTable

# Define the base path for the files
basePath = "abfss://landing@&amp;lt;&amp;gt;.dfs.core.windows.net"

# Define the path pattern to match all files
pathPattern = f"{basePath}/*/*.csv"

# Get today's date and format it
today_str = datetime.now().strftime("%Y-%m-%d")

# Load the data using AutoLoader
df = (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("cloudFiles.inferColumnTypes", "true")
      .option("cloudFiles.includeExistingFiles", "true")
      .option("cloudFiles.schemaLocation", f"{basePath}/_schema")
      .option("path", pathPattern)
      .load())

# Add a column to extract the full file path
df = df.withColumn("file_path", input_file_name())

# Extract the date using a regular expression
df = df.withColumn("file_date", regexp_extract(col("file_path"), r'(\d{4}-\d{2}-\d{2})', 1))

def merge_to_delta(microBatchDF: DataFrame, batchId: int):
    # Define the Delta table to merge into
    deltaTable = DeltaTable.forName(spark, "dev_catalogue.db_bronze.autoloader_merge")
    
    # Perform the merge operation
    (deltaTable.alias("target")
     .merge(
         microBatchDF.alias("source"),
         "target.Date = source.Date" 
     )
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute()
    )

# Use foreachBatch to apply the merge function
query = (filtered_df.writeStream
         .foreachBatch(merge_to_delta)
         .option("checkpointLocation", f"{basePath}/_checkpoints/autoloader_merge")
         .start())&lt;/LI-CODE&gt;&lt;P&gt;On the other hand, I would recommend not to implement this like this and follow the standard Medallion Architecture pattern.&amp;nbsp; This will give you a better, more maintainable solution.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Medallion Architecture Approach:&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Bronze Layer – Raw Data&lt;/STRONG&gt;: Use Auto Loader to ingest your JSON files from S3 directly into a Bronze delta table without performing any transformations or updates. This layer should store the raw data.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;bronze_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/path/to/bronze-schema")  # Schema location for Auto Loader
    .load("s3://your-bucket/path/to/bronze-files")
)

bronze_df.writeStream.format("delta").option("checkpointLocation", "/path/to/bronze-checkpoint").table("bronze_table")&lt;/LI-CODE&gt;&lt;P&gt;&lt;STRONG&gt;Silver Layer – Cleaned Data&lt;/STRONG&gt;: Once the raw data is ingested into the Bronze table, perform the necessary transformations and apply the MERGE operation between the Bronze and Silver tables. The Silver table will contain cleaned and merged data:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;# Read data from the Bronze table
bronze_data = spark.read.format("delta").table("bronze_table")

# Perform the MERGE operation to update existing records or insert new ones in the Silver table
spark.sql("""
MERGE INTO silver_table AS silver
USING bronze_data AS bronze
ON silver.id = bronze.id
WHEN MATCHED THEN
  UPDATE SET silver.* = bronze.*
WHEN NOT MATCHED THEN
  INSERT *
""")&lt;/LI-CODE&gt;&lt;P&gt;&lt;STRONG&gt;Why Use the Medallion Architecture?&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Scalability and Maintainability&lt;/STRONG&gt;: Separating raw data (Bronze), cleaned data (Silver), ensures that each layer serves a distinct purpose, making it easier to manage and scale your solution over time.&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Flexibility for Future Enhancements&lt;/STRONG&gt;: It’s easier to introduce new logic, transformations, or changes in schema by maintaining separate layers.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 01 Oct 2024 18:33:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/updating-records-with-auto-loader/m-p/92483#M38449</guid>
      <dc:creator>filipniziol</dc:creator>
      <dc:date>2024-10-01T18:33:22Z</dc:date>
    </item>
  </channel>
</rss>

