cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader with availableNow=True and overwrite mode removes data in second micro-batch (DBR 16.3)

divyansh8989
New Contributor


Hi everyone,

I'm encountering an issue after upgrading to Databricks Runtime 16.3, while using Autoloader with the following configuration:

trigger(availableNow=True)
outputMode("overwrite")


When a new file arrives, Autoloader processes it and writes the data to a Delta table. However, I consistently observe two micro-batches being triggered:

First micro-batch ingests the file and writes data to the Delta table.
Second micro-batch, triggered just a few seconds later, finds no new files and still executes in overwrite mode — which ends up removing the previously written data.
This behavior is confirmed in the Delta table history:

Version 422: file ingested, 3848 rows written.
Version 423: file removed, no new data written.
Checkpoint directory also shows commit files 422 and 423, confirming two micro-batches.

This issue started occurring after we upgraded to DBR 16.3. Prior to that, the overwrite behavior was stable and did not remove data unexpectedly.

Has anyone else encountered this issue? Is there a recommended way to prevent empty micro-batches from overwriting the table?

Any guidance or best practices would be greatly appreciated!

Thanks in advance.

1 REPLY 1

ashesharyak
New Contributor II

You've hit on a known behavioral change or subtle interaction in Databricks Runtime 16.3 with Autoloader, trigger(availableNow=True), and outputMode("overwrite"). This specific combination seems to be causing an unexpected second micro-batch that overwrites the data.

Here's a breakdown of why this might be happening and what you can do:

Understanding the Behavior

  • trigger(availableNow=True): This trigger processes all available data up to the moment the query starts as a single batch and then stops. It's designed for "run-once" or scheduled batch processing.

  • outputMode("overwrite"): This output mode overwrites the entire Delta table with the data from the current micro-batch.

  • The Problem in DBR 16.3: It appears that in DBR 16.3, even after the initial batch processes the new file and writes data, a subsequent empty micro-batch is being triggered very quickly. Because outputMode("overwrite") is set, this empty batch then overwrites the table, effectively deleting the data that was just written.

This wasn't the typical behavior in earlier DBR versions, where availableNow=True combined with overwrite would usually result in one write and then a graceful termination if no more data was found.

Potential Causes/Hypotheses:

  1. Change in availableNow Trigger Logic: There might be a subtle change in how availableNow interacts with the internal state management in DBR 16.3, leading to an extra "empty" micro-batch check and subsequent overwrite.

  2. Internal File Discovery/Checkpointing Changes: Autoloader uses a checkpoint location to track processed files. It's possible that the way file discovery or checkpointing is handled in DBR 16.3, especially with availableNow, is leading to a quick second check that reports no new files but still initiates a write operation due to the overwrite mode.

  3. Optimization or Bug: It could be an unintended consequence of an optimization or a bug introduced in DBR 16.3 for specific scenarios involving availableNow and overwrite.

Recommended Solutions and Best Practices:

Given the observed behavior, here's how you can prevent the data loss:

  1. Change outputMode("overwrite") to outputMode("append"):

    • Best Practice for Incremental Ingestion: For streaming or incremental ingestion with Autoloader, append is almost always the correct outputMode. This ensures that new data is added to the table without affecting existing data.

    • Why Overwrite is Risky Here: overwrite is generally used when you want to completely replace the table's contents, often in a full batch load scenario where you know the source will provide the complete dataset. For an incremental stream, it's problematic if an empty batch overwrites the entire table.

  2. Use foreachBatch for Upserts/Merges:

    • If your goal is to handle updates or deduplication (i.e., you might receive the same file again, or updated records), you should use outputMode("append") in conjunction with foreachBatch and perform a MERGE INTO operation.

    • This gives you granular control over how each micro-batch's data is integrated into the Delta table.

    • Example (PySpark):

    Python
     
    from delta.tables import DeltaTable
    from pyspark.sql import SparkSession
    
    # Initialize SparkSession (if not already done)
    spark = SparkSession.builder.appName("AutoloaderMerge").getOrCreate()
    
    # Define your source path and checkpoint location
    source_path = "abfss://your-container@your-storage-account.dfs.core.windows.net/input/"
    delta_table_path = "abfss://your-container@your-storage-account.dfs.core.windows.net/delta_table/"
    checkpoint_location = "abfss://your-container@your-storage-account.dfs.core.windows.net/checkpoint/"
    
    def upsert_to_delta(microBatchDF, batchId😞
        # Create DeltaTable object if it doesn't exist
        if not DeltaTable.isDeltaTable(spark, delta_table_path):
            microBatchDF.write.format("delta").mode("append").save(delta_table_path)
            print(f"Batch {batchId}: Created initial Delta table.")
        else:
            deltaTable = DeltaTable.forPath(spark, delta_table_path)
            # Perform merge operation
            deltaTable.alias("target") \
                .merge(
                    microBatchDF.alias("source"),
                    "target.id = source.id" # Replace 'id' with your actual primary key(s)
                ) \
                .whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()
            print(f"Batch {batchId}: Merged {microBatchDF.count()} rows into Delta table.")
    
    (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv") # Or your file format
        .option("cloudFiles.schemaLocation", f"{checkpoint_location}/schema")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(source_path)
        .writeStream
        .option("checkpointLocation", checkpoint_location)
        .foreachBatch(upsert_to_delta)
        .trigger(availableNow=True)
        .start()
        .awaitTermination())
    • Explanation:

      • foreachBatch(upsert_to_delta): This will call your upsert_to_delta function for each micro-batch.

      • Inside upsert_to_delta:

        • It checks if the Delta table exists. If not, it creates it with mode("append") for the initial load.

        • If the table exists, it performs a MERGE INTO operation. You'll need to define your merge condition based on your table's primary key(s). This ensures that existing records are updated and new records are inserted.

  3. Verify cloudFiles.allowOverwrites (and use with caution):

    • The cloudFiles.allowOverwrites option (default false) controls whether Autoloader processes files again if they are appended to or overwritten in the source.

    • Important: Setting cloudFiles.allowOverwrites to true (which is not typically recommended unless you manage duplicates downstream) might lead to reprocessing of files. Even then, it won't directly solve the issue of an empty second micro-batch overwriting everything. It's more about how Autoloader re-ingests source files that have changed.

  4. Report to Databricks Support:

    • Since this behavior started specifically after upgrading to DBR 16.3 and was not present before, it's worth opening a support ticket with Databricks. Provide them with your exact configuration, DBR version, and the observed Delta table history. They can confirm if it's a known regression or a new intended (but perhaps problematic for your use case) behavior.

Why outputMode("overwrite") with availableNow=True is tricky for incremental loads:

When you use trigger(availableNow=True), the expectation is often that it processes everything once and then finishes. Combining this with outputMode("overwrite") means that each time this stream runs (whether it finds new data or not, if a subsequent "empty" batch is triggered), it will completely replace the target table. If there's no new data in a subsequent batch, the table effectively becomes empty.

In summary, the most robust and recommended solution for your scenario is to switch from outputMode("overwrite") to using foreachBatch with MERGE INTO for idempotent updates/inserts, or simply outputMode("append") if you only expect new records and don't need to handle updates/deduplication at the Delta table level.