cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader clarification

Kanna
New Contributor II

Hi team,

Good day! I would like to know how we can perform an incremental load using Autoloader.
I am uploading one file to DBFS and writing it into a table. When I upload a similar file to the same directory, it does not perform an incremental load; instead, I see duplicate rows in the final table where I am writing.

Below is the code I am using. Is there anything I am missing here?

df = spark.readStream\
        .format('cloudFiles')\
        .option("cloudFiles.format","csv")\
        .option("cloudFiles.schemaLocation",f'{source_dir}/schemaInfer')\
        .option('header','true')\
        .load(source_dir)

 

(
    df.writeStream.option("checkpointLocation", "dbfs:/FileStore/streamingwritetest/checkpointlocation1")
    .outputMode("append")
    .queryName("writestreamquery")
    .toTable("stream.writestream")
)
 
File: 
 
CountryCitizens
India10
USA5
China10
India10
Canada40

Thank you!

1 ACCEPTED SOLUTION

Accepted Solutions

boitumelodikoko
Contributor

Hi @Kanna,

Good day! Based on the issue you’re encountering, I believe the problem stems from missing deduplication or upsert logic in your current implementation. Here's an approach that combines the power of Databricks Autoloader and Delta Lake to handle incremental loads and avoid duplicates effectively.

Enhanced Implementation with Upsert Logic

Below is a complete solution that ensures incremental loads using foreachBatch and DeltaTable.merge for upserts:

 

from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Define paths
table_path = "dbfs:/path/to/delta/table"
bad_records_path = "dbfs:/path/to/bad/records"
checkpoint_path = "dbfs:/path/to/checkpoint"
loading_path = "dbfs:/path/to/source"

# Define the schema for your input data
schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Citizens", IntegerType(), True)
])

# Define the merge logic for upserts
def upsert_to_delta(microBatchOutputDF, batchId):
    # Check if the Delta table exists
    if DeltaTable.isDeltaTable(spark, table_path):
        # Define the Delta table
        delta_table = DeltaTable.forPath(spark, table_path)

        # Perform the merge operation
        (delta_table.alias("tgt")
         .merge(
            microBatchOutputDF.alias("src"),
            "tgt.Country = src.Country"  # Merge condition based on the primary key (e.g., Country)
         )
         .whenMatchedUpdateAll()  # Update all columns if a match is found
         .whenNotMatchedInsertAll()  # Insert all columns if no match is found
         .execute())
    else:
        # If Delta table doesn't exist, write the batch as a new Delta table
        (microBatchOutputDF.write
         .format("delta")
         .mode("overwrite")
         .save(table_path))

# Read streaming data using Autoloader
sdf = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("badRecordsPath", bad_records_path)
    .schema(schema)  # Define the schema
    .load(loading_path))  # Source directory

# Write streaming data to the Delta table with upsert logic
(sdf.writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("badRecordsPath", bad_records_path)
    .foreachBatch(upsert_to_delta)  # Apply the upsert logic
    .option("checkpointLocation", checkpoint_path)
    .start(table_path))

 

 

Key Features of the Solution

  1. Upsert Logic:

    • Uses DeltaTable.merge to ensure incremental updates by matching records based on a primary key (e.g., Country).
    • Updates existing records and inserts new ones, preventing duplicates.
  2. Schema Inference:

    • cloudFiles.inferColumnTypes and schema ensure robust schema handling.
  3. Error Handling:

    • badRecordsPath captures problematic records for later review.
  4. Incremental Processing:

    • Autoloader processes only new files, ensuring efficient incremental loading.

Why This Works

  • Delta Lake: Tracks changes and enables ACID transactions, ensuring data integrity.
  • Autoloader: Automatically detects and loads new files from the source directory.
  • Deduplication: The merge condition prevents duplicate rows in the target table.

Reference

For additional details, check the Databricks Autoloader Documentation and Delta Lake Documentation.

Let me know if this resolves your issue or if you need further assistance.


Thanks,
Boitumelo

View solution in original post

1 REPLY 1

boitumelodikoko
Contributor

Hi @Kanna,

Good day! Based on the issue you’re encountering, I believe the problem stems from missing deduplication or upsert logic in your current implementation. Here's an approach that combines the power of Databricks Autoloader and Delta Lake to handle incremental loads and avoid duplicates effectively.

Enhanced Implementation with Upsert Logic

Below is a complete solution that ensures incremental loads using foreachBatch and DeltaTable.merge for upserts:

 

from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Define paths
table_path = "dbfs:/path/to/delta/table"
bad_records_path = "dbfs:/path/to/bad/records"
checkpoint_path = "dbfs:/path/to/checkpoint"
loading_path = "dbfs:/path/to/source"

# Define the schema for your input data
schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Citizens", IntegerType(), True)
])

# Define the merge logic for upserts
def upsert_to_delta(microBatchOutputDF, batchId):
    # Check if the Delta table exists
    if DeltaTable.isDeltaTable(spark, table_path):
        # Define the Delta table
        delta_table = DeltaTable.forPath(spark, table_path)

        # Perform the merge operation
        (delta_table.alias("tgt")
         .merge(
            microBatchOutputDF.alias("src"),
            "tgt.Country = src.Country"  # Merge condition based on the primary key (e.g., Country)
         )
         .whenMatchedUpdateAll()  # Update all columns if a match is found
         .whenNotMatchedInsertAll()  # Insert all columns if no match is found
         .execute())
    else:
        # If Delta table doesn't exist, write the batch as a new Delta table
        (microBatchOutputDF.write
         .format("delta")
         .mode("overwrite")
         .save(table_path))

# Read streaming data using Autoloader
sdf = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("badRecordsPath", bad_records_path)
    .schema(schema)  # Define the schema
    .load(loading_path))  # Source directory

# Write streaming data to the Delta table with upsert logic
(sdf.writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("badRecordsPath", bad_records_path)
    .foreachBatch(upsert_to_delta)  # Apply the upsert logic
    .option("checkpointLocation", checkpoint_path)
    .start(table_path))

 

 

Key Features of the Solution

  1. Upsert Logic:

    • Uses DeltaTable.merge to ensure incremental updates by matching records based on a primary key (e.g., Country).
    • Updates existing records and inserts new ones, preventing duplicates.
  2. Schema Inference:

    • cloudFiles.inferColumnTypes and schema ensure robust schema handling.
  3. Error Handling:

    • badRecordsPath captures problematic records for later review.
  4. Incremental Processing:

    • Autoloader processes only new files, ensuring efficient incremental loading.

Why This Works

  • Delta Lake: Tracks changes and enables ACID transactions, ensuring data integrity.
  • Autoloader: Automatically detects and loads new files from the source directory.
  • Deduplication: The merge condition prevents duplicate rows in the target table.

Reference

For additional details, check the Databricks Autoloader Documentation and Delta Lake Documentation.

Let me know if this resolves your issue or if you need further assistance.


Thanks,
Boitumelo

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now