Hi @databrickser ,
In theory, it is possible to use Auto Loader with foreachBatch to update existing records.
The below code snippet show a working solution:
from datetime import datetime
from pyspark.sql import DataFrame
from pyspark.sql.functions import input_file_name, regexp_extract, col, lit, to_date
from pyspark.sql.streaming import DataStreamWriter
from delta.tables import DeltaTable # Import DeltaTable
# Define the base path for the files
basePath = "abfss://landing@<>.dfs.core.windows.net"
# Define the path pattern to match all files
pathPattern = f"{basePath}/*/*.csv"
# Get today's date and format it
today_str = datetime.now().strftime("%Y-%m-%d")
# Load the data using AutoLoader
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.schemaLocation", f"{basePath}/_schema")
.option("path", pathPattern)
.load())
# Add a column to extract the full file path
df = df.withColumn("file_path", input_file_name())
# Extract the date using a regular expression
df = df.withColumn("file_date", regexp_extract(col("file_path"), r'(\d{4}-\d{2}-\d{2})', 1))
def merge_to_delta(microBatchDF: DataFrame, batchId: int):
# Define the Delta table to merge into
deltaTable = DeltaTable.forName(spark, "dev_catalogue.db_bronze.autoloader_merge")
# Perform the merge operation
(deltaTable.alias("target")
.merge(
microBatchDF.alias("source"),
"target.Date = source.Date"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# Use foreachBatch to apply the merge function
query = (filtered_df.writeStream
.foreachBatch(merge_to_delta)
.option("checkpointLocation", f"{basePath}/_checkpoints/autoloader_merge")
.start())
On the other hand, I would recommend not to implement this like this and follow the standard Medallion Architecture pattern. This will give you a better, more maintainable solution.
Medallion Architecture Approach:
Bronze Layer โ Raw Data: Use Auto Loader to ingest your JSON files from S3 directly into a Bronze delta table without performing any transformations or updates. This layer should store the raw data.
bronze_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/path/to/bronze-schema") # Schema location for Auto Loader
.load("s3://your-bucket/path/to/bronze-files")
)
bronze_df.writeStream.format("delta").option("checkpointLocation", "/path/to/bronze-checkpoint").table("bronze_table")
Silver Layer โ Cleaned Data: Once the raw data is ingested into the Bronze table, perform the necessary transformations and apply the MERGE operation between the Bronze and Silver tables. The Silver table will contain cleaned and merged data:
# Read data from the Bronze table
bronze_data = spark.read.format("delta").table("bronze_table")
# Perform the MERGE operation to update existing records or insert new ones in the Silver table
spark.sql("""
MERGE INTO silver_table AS silver
USING bronze_data AS bronze
ON silver.id = bronze.id
WHEN MATCHED THEN
UPDATE SET silver.* = bronze.*
WHEN NOT MATCHED THEN
INSERT *
""")
Why Use the Medallion Architecture?
Scalability and Maintainability: Separating raw data (Bronze), cleaned data (Silver), ensures that each layer serves a distinct purpose, making it easier to manage and scale your solution over time.
Flexibility for Future Enhancements: Itโs easier to introduce new logic, transformations, or changes in schema by maintaining separate layers.