<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Autoloader clarification in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/autoloader-clarification/m-p/106414#M42482</link>
    <description>&lt;P&gt;Hi &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/144894"&gt;@Kanna&lt;/a&gt;,&lt;/P&gt;&lt;P&gt;Good day! Based on the issue you’re encountering, I believe the problem stems from missing deduplication or upsert logic in your current implementation. Here's an approach that combines the power of &lt;STRONG&gt;Databricks Autoloader&lt;/STRONG&gt; and &lt;STRONG&gt;Delta Lake&lt;/STRONG&gt; to handle incremental loads and avoid duplicates effectively.&lt;/P&gt;&lt;H3&gt;Enhanced Implementation with Upsert Logic&lt;/H3&gt;&lt;P&gt;Below is a complete solution that ensures incremental loads using foreachBatch and DeltaTable.merge for upserts:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Define paths
table_path = "dbfs:/path/to/delta/table"
bad_records_path = "dbfs:/path/to/bad/records"
checkpoint_path = "dbfs:/path/to/checkpoint"
loading_path = "dbfs:/path/to/source"

# Define the schema for your input data
schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Citizens", IntegerType(), True)
])

# Define the merge logic for upserts
def upsert_to_delta(microBatchOutputDF, batchId):
    # Check if the Delta table exists
    if DeltaTable.isDeltaTable(spark, table_path):
        # Define the Delta table
        delta_table = DeltaTable.forPath(spark, table_path)

        # Perform the merge operation
        (delta_table.alias("tgt")
         .merge(
            microBatchOutputDF.alias("src"),
            "tgt.Country = src.Country"  # Merge condition based on the primary key (e.g., Country)
         )
         .whenMatchedUpdateAll()  # Update all columns if a match is found
         .whenNotMatchedInsertAll()  # Insert all columns if no match is found
         .execute())
    else:
        # If Delta table doesn't exist, write the batch as a new Delta table
        (microBatchOutputDF.write
         .format("delta")
         .mode("overwrite")
         .save(table_path))

# Read streaming data using Autoloader
sdf = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("badRecordsPath", bad_records_path)
    .schema(schema)  # Define the schema
    .load(loading_path))  # Source directory

# Write streaming data to the Delta table with upsert logic
(sdf.writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("badRecordsPath", bad_records_path)
    .foreachBatch(upsert_to_delta)  # Apply the upsert logic
    .option("checkpointLocation", checkpoint_path)
    .start(table_path))&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;H3&gt;Key Features of the Solution&lt;/H3&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Upsert Logic&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Uses DeltaTable.merge to ensure incremental updates by matching records based on a primary key (e.g., Country).&lt;/LI&gt;&lt;LI&gt;Updates existing records and inserts new ones, preventing duplicates.&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Schema Inference&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;cloudFiles.inferColumnTypes and schema ensure robust schema handling.&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Error Handling&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;badRecordsPath captures problematic records for later review.&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Incremental Processing&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Autoloader processes only new files, ensuring efficient incremental loading.&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;H3&gt;Why This Works&lt;/H3&gt;&lt;UL&gt;&lt;LI&gt;&lt;STRONG&gt;Delta Lake&lt;/STRONG&gt;: Tracks changes and enables ACID transactions, ensuring data integrity.&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Autoloader&lt;/STRONG&gt;: Automatically detects and loads new files from the source directory.&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Deduplication&lt;/STRONG&gt;: The merge condition prevents duplicate rows in the target table.&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;Reference&lt;/H3&gt;&lt;P&gt;For additional details, check the &lt;A target="_new" rel="noopener"&gt;&lt;SPAN&gt;Databricks&lt;/SPAN&gt;&lt;SPAN&gt; Autoloader&lt;/SPAN&gt;&lt;SPAN&gt; Documentation&lt;/SPAN&gt;&lt;/A&gt; and &lt;A target="_new" rel="noopener"&gt;&lt;SPAN&gt;Delta&lt;/SPAN&gt;&lt;SPAN&gt; Lake&lt;/SPAN&gt;&lt;SPAN&gt; Documentation&lt;/SPAN&gt;&lt;/A&gt;.&lt;/P&gt;&lt;P&gt;Let me know if this resolves your issue or if you need further assistance.&lt;/P&gt;</description>
    <pubDate>Tue, 21 Jan 2025 08:49:02 GMT</pubDate>
    <dc:creator>boitumelodikoko</dc:creator>
    <dc:date>2025-01-21T08:49:02Z</dc:date>
    <item>
      <title>Autoloader clarification</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-clarification/m-p/106392#M42474</link>
      <description>&lt;P&gt;Hi team,&lt;/P&gt;&lt;P&gt;Good day! I would like to know how we can perform an incremental load using Autoloader.&lt;BR /&gt;I am uploading one file to DBFS and writing it into a table. When I upload a similar file to the same directory, it does not perform an incremental load; instead, I see duplicate rows in the final table where I am writing.&lt;/P&gt;&lt;P&gt;Below is the code I am using. Is there anything I am missing here?&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;df = spark.readStream\&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'cloudFiles'&lt;/SPAN&gt;&lt;SPAN&gt;)\&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;"csv"&lt;/SPAN&gt;&lt;SPAN&gt;)\&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.schemaLocation"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;f'&lt;/SPAN&gt;&lt;SPAN&gt;{source_dir}&lt;/SPAN&gt;&lt;SPAN&gt;/schemaInfer'&lt;/SPAN&gt;&lt;SPAN&gt;)\&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option(&lt;/SPAN&gt;&lt;SPAN&gt;'header'&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;'true'&lt;/SPAN&gt;&lt;SPAN&gt;)\&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .load(source_dir)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df.writeStream.option(&lt;/SPAN&gt;&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"dbfs:/FileStore/streamingwritetest/checkpointlocation1"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .outputMode(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .queryName(&lt;/SPAN&gt;&lt;SPAN&gt;"writestreamquery"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; .toTable(&lt;/SPAN&gt;&lt;SPAN&gt;"stream.writestream"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;File:&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;TABLE border="0" width="128" cellspacing="0" cellpadding="0"&gt;&lt;COLGROUP&gt;&lt;COL width="64" /&gt;&lt;/COLGROUP&gt;&lt;TBODY&gt;&lt;TR&gt;&lt;TD width="64" height="19"&gt;Country&lt;/TD&gt;&lt;TD width="64"&gt;Citizens&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD height="19"&gt;India&lt;/TD&gt;&lt;TD&gt;10&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD height="19"&gt;USA&lt;/TD&gt;&lt;TD&gt;5&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD height="19"&gt;China&lt;/TD&gt;&lt;TD&gt;10&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD height="19"&gt;India&lt;/TD&gt;&lt;TD&gt;10&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD height="19"&gt;Canada&lt;/TD&gt;&lt;TD&gt;40&lt;/TD&gt;&lt;/TR&gt;&lt;/TBODY&gt;&lt;/TABLE&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;Thank you!&lt;/P&gt;</description>
      <pubDate>Tue, 21 Jan 2025 05:50:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-clarification/m-p/106392#M42474</guid>
      <dc:creator>Kanna</dc:creator>
      <dc:date>2025-01-21T05:50:09Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader clarification</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-clarification/m-p/106414#M42482</link>
      <description>&lt;P&gt;Hi &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/144894"&gt;@Kanna&lt;/a&gt;,&lt;/P&gt;&lt;P&gt;Good day! Based on the issue you’re encountering, I believe the problem stems from missing deduplication or upsert logic in your current implementation. Here's an approach that combines the power of &lt;STRONG&gt;Databricks Autoloader&lt;/STRONG&gt; and &lt;STRONG&gt;Delta Lake&lt;/STRONG&gt; to handle incremental loads and avoid duplicates effectively.&lt;/P&gt;&lt;H3&gt;Enhanced Implementation with Upsert Logic&lt;/H3&gt;&lt;P&gt;Below is a complete solution that ensures incremental loads using foreachBatch and DeltaTable.merge for upserts:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

# Define paths
table_path = "dbfs:/path/to/delta/table"
bad_records_path = "dbfs:/path/to/bad/records"
checkpoint_path = "dbfs:/path/to/checkpoint"
loading_path = "dbfs:/path/to/source"

# Define the schema for your input data
schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Citizens", IntegerType(), True)
])

# Define the merge logic for upserts
def upsert_to_delta(microBatchOutputDF, batchId):
    # Check if the Delta table exists
    if DeltaTable.isDeltaTable(spark, table_path):
        # Define the Delta table
        delta_table = DeltaTable.forPath(spark, table_path)

        # Perform the merge operation
        (delta_table.alias("tgt")
         .merge(
            microBatchOutputDF.alias("src"),
            "tgt.Country = src.Country"  # Merge condition based on the primary key (e.g., Country)
         )
         .whenMatchedUpdateAll()  # Update all columns if a match is found
         .whenNotMatchedInsertAll()  # Insert all columns if no match is found
         .execute())
    else:
        # If Delta table doesn't exist, write the batch as a new Delta table
        (microBatchOutputDF.write
         .format("delta")
         .mode("overwrite")
         .save(table_path))

# Read streaming data using Autoloader
sdf = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("badRecordsPath", bad_records_path)
    .schema(schema)  # Define the schema
    .load(loading_path))  # Source directory

# Write streaming data to the Delta table with upsert logic
(sdf.writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("badRecordsPath", bad_records_path)
    .foreachBatch(upsert_to_delta)  # Apply the upsert logic
    .option("checkpointLocation", checkpoint_path)
    .start(table_path))&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;H3&gt;Key Features of the Solution&lt;/H3&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Upsert Logic&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Uses DeltaTable.merge to ensure incremental updates by matching records based on a primary key (e.g., Country).&lt;/LI&gt;&lt;LI&gt;Updates existing records and inserts new ones, preventing duplicates.&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Schema Inference&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;cloudFiles.inferColumnTypes and schema ensure robust schema handling.&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Error Handling&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;badRecordsPath captures problematic records for later review.&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Incremental Processing&lt;/STRONG&gt;:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Autoloader processes only new files, ensuring efficient incremental loading.&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;H3&gt;Why This Works&lt;/H3&gt;&lt;UL&gt;&lt;LI&gt;&lt;STRONG&gt;Delta Lake&lt;/STRONG&gt;: Tracks changes and enables ACID transactions, ensuring data integrity.&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Autoloader&lt;/STRONG&gt;: Automatically detects and loads new files from the source directory.&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Deduplication&lt;/STRONG&gt;: The merge condition prevents duplicate rows in the target table.&lt;/LI&gt;&lt;/UL&gt;&lt;H3&gt;Reference&lt;/H3&gt;&lt;P&gt;For additional details, check the &lt;A target="_new" rel="noopener"&gt;&lt;SPAN&gt;Databricks&lt;/SPAN&gt;&lt;SPAN&gt; Autoloader&lt;/SPAN&gt;&lt;SPAN&gt; Documentation&lt;/SPAN&gt;&lt;/A&gt; and &lt;A target="_new" rel="noopener"&gt;&lt;SPAN&gt;Delta&lt;/SPAN&gt;&lt;SPAN&gt; Lake&lt;/SPAN&gt;&lt;SPAN&gt; Documentation&lt;/SPAN&gt;&lt;/A&gt;.&lt;/P&gt;&lt;P&gt;Let me know if this resolves your issue or if you need further assistance.&lt;/P&gt;</description>
      <pubDate>Tue, 21 Jan 2025 08:49:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-clarification/m-p/106414#M42482</guid>
      <dc:creator>boitumelodikoko</dc:creator>
      <dc:date>2025-01-21T08:49:02Z</dc:date>
    </item>
  </channel>
</rss>

