- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-20-2025 09:50 PM
Hi team,
Good day! I would like to know how we can perform an incremental load using Autoloader.
I am uploading one file to DBFS and writing it into a table. When I upload a similar file to the same directory, it does not perform an incremental load; instead, I see duplicate rows in the final table where I am writing.
Below is the code I am using. Is there anything I am missing here?
Country | Citizens |
India | 10 |
USA | 5 |
China | 10 |
India | 10 |
Canada | 40 |
Thank you!
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-21-2025 12:49 AM
Hi @Kanna,
Good day! Based on the issue you’re encountering, I believe the problem stems from missing deduplication or upsert logic in your current implementation. Here's an approach that combines the power of Databricks Autoloader and Delta Lake to handle incremental loads and avoid duplicates effectively.
Enhanced Implementation with Upsert Logic
Below is a complete solution that ensures incremental loads using foreachBatch and DeltaTable.merge for upserts:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
# Define paths
table_path = "dbfs:/path/to/delta/table"
bad_records_path = "dbfs:/path/to/bad/records"
checkpoint_path = "dbfs:/path/to/checkpoint"
loading_path = "dbfs:/path/to/source"
# Define the schema for your input data
schema = StructType([
StructField("Country", StringType(), True),
StructField("Citizens", IntegerType(), True)
])
# Define the merge logic for upserts
def upsert_to_delta(microBatchOutputDF, batchId):
# Check if the Delta table exists
if DeltaTable.isDeltaTable(spark, table_path):
# Define the Delta table
delta_table = DeltaTable.forPath(spark, table_path)
# Perform the merge operation
(delta_table.alias("tgt")
.merge(
microBatchOutputDF.alias("src"),
"tgt.Country = src.Country" # Merge condition based on the primary key (e.g., Country)
)
.whenMatchedUpdateAll() # Update all columns if a match is found
.whenNotMatchedInsertAll() # Insert all columns if no match is found
.execute())
else:
# If Delta table doesn't exist, write the batch as a new Delta table
(microBatchOutputDF.write
.format("delta")
.mode("overwrite")
.save(table_path))
# Read streaming data using Autoloader
sdf = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", "true")
.option("badRecordsPath", bad_records_path)
.schema(schema) # Define the schema
.load(loading_path)) # Source directory
# Write streaming data to the Delta table with upsert logic
(sdf.writeStream
.format("delta")
.outputMode("append")
.option("mergeSchema", "true")
.option("badRecordsPath", bad_records_path)
.foreachBatch(upsert_to_delta) # Apply the upsert logic
.option("checkpointLocation", checkpoint_path)
.start(table_path))
Key Features of the Solution
Upsert Logic:
- Uses DeltaTable.merge to ensure incremental updates by matching records based on a primary key (e.g., Country).
- Updates existing records and inserts new ones, preventing duplicates.
Schema Inference:
- cloudFiles.inferColumnTypes and schema ensure robust schema handling.
Error Handling:
- badRecordsPath captures problematic records for later review.
Incremental Processing:
- Autoloader processes only new files, ensuring efficient incremental loading.
Why This Works
- Delta Lake: Tracks changes and enables ACID transactions, ensuring data integrity.
- Autoloader: Automatically detects and loads new files from the source directory.
- Deduplication: The merge condition prevents duplicate rows in the target table.
Reference
For additional details, check the Databricks Autoloader Documentation and Delta Lake Documentation.
Let me know if this resolves your issue or if you need further assistance.
Thanks,
Boitumelo
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-21-2025 12:49 AM
Hi @Kanna,
Good day! Based on the issue you’re encountering, I believe the problem stems from missing deduplication or upsert logic in your current implementation. Here's an approach that combines the power of Databricks Autoloader and Delta Lake to handle incremental loads and avoid duplicates effectively.
Enhanced Implementation with Upsert Logic
Below is a complete solution that ensures incremental loads using foreachBatch and DeltaTable.merge for upserts:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
# Define paths
table_path = "dbfs:/path/to/delta/table"
bad_records_path = "dbfs:/path/to/bad/records"
checkpoint_path = "dbfs:/path/to/checkpoint"
loading_path = "dbfs:/path/to/source"
# Define the schema for your input data
schema = StructType([
StructField("Country", StringType(), True),
StructField("Citizens", IntegerType(), True)
])
# Define the merge logic for upserts
def upsert_to_delta(microBatchOutputDF, batchId):
# Check if the Delta table exists
if DeltaTable.isDeltaTable(spark, table_path):
# Define the Delta table
delta_table = DeltaTable.forPath(spark, table_path)
# Perform the merge operation
(delta_table.alias("tgt")
.merge(
microBatchOutputDF.alias("src"),
"tgt.Country = src.Country" # Merge condition based on the primary key (e.g., Country)
)
.whenMatchedUpdateAll() # Update all columns if a match is found
.whenNotMatchedInsertAll() # Insert all columns if no match is found
.execute())
else:
# If Delta table doesn't exist, write the batch as a new Delta table
(microBatchOutputDF.write
.format("delta")
.mode("overwrite")
.save(table_path))
# Read streaming data using Autoloader
sdf = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", "true")
.option("badRecordsPath", bad_records_path)
.schema(schema) # Define the schema
.load(loading_path)) # Source directory
# Write streaming data to the Delta table with upsert logic
(sdf.writeStream
.format("delta")
.outputMode("append")
.option("mergeSchema", "true")
.option("badRecordsPath", bad_records_path)
.foreachBatch(upsert_to_delta) # Apply the upsert logic
.option("checkpointLocation", checkpoint_path)
.start(table_path))
Key Features of the Solution
Upsert Logic:
- Uses DeltaTable.merge to ensure incremental updates by matching records based on a primary key (e.g., Country).
- Updates existing records and inserts new ones, preventing duplicates.
Schema Inference:
- cloudFiles.inferColumnTypes and schema ensure robust schema handling.
Error Handling:
- badRecordsPath captures problematic records for later review.
Incremental Processing:
- Autoloader processes only new files, ensuring efficient incremental loading.
Why This Works
- Delta Lake: Tracks changes and enables ACID transactions, ensuring data integrity.
- Autoloader: Automatically detects and loads new files from the source directory.
- Deduplication: The merge condition prevents duplicate rows in the target table.
Reference
For additional details, check the Databricks Autoloader Documentation and Delta Lake Documentation.
Let me know if this resolves your issue or if you need further assistance.
Thanks,
Boitumelo

