lingareddy_Alva
Esteemed Contributor

Hi @yit 

This is a classic timing and metadata synchronization issue between Delta table creation and Autoloader initialization.
Here's what's happening and how to fix it.

The error occurs because:
Delta table creation writes initial metadata to the _delta_log directory
Autoloader schema inference tries to write to the same metadata location almost simultaneously
ADLS eventual consistency can cause conflicts when operations happen too quickly
Metastore synchronization may not be complete when Autoloader starts.

Add Explicit Wait/Validation:

import time
from delta.tables import DeltaTable

def create_table_and_wait(table_name, table_location):
    """Create table and ensure it's ready for Autoloader"""
    
    # Create the external Delta table
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            -- your schema here
        ) USING DELTA
        LOCATION '{table_location}'
    """)
    
    # Wait for table creation to complete
    time.sleep(5)
    
    # Validate table is accessible and metadata is ready
    max_retries = 10
    for attempt in range(max_retries):
        try:
            # Try to access the Delta table metadata
            delta_table = DeltaTable.forPath(spark, table_location)
            table_version = delta_table.history(1).collect()[0].version
            print(f"Table ready at version {table_version}")
            break
        except Exception as e:
            if attempt < max_retries - 1:
                print(f"Waiting for table metadata... attempt {attempt + 1}")
                time.sleep(2)
            else:
                raise Exception(f"Table not ready after {max_retries} attempts: {e}")
    
    # Additional validation - ensure directory structure exists
    try:
        dbutils.fs.ls(f"{table_location}/_delta_log/")
        print("Delta log directory confirmed")
    except:
        time.sleep(3)  # Additional wait if needed

# Usage
create_table_and_wait("my_catalog.my_schema.my_table", "abfss://container@storage.dfs.core.windows.net/my-path/")

# Now start Autoloader
autoloader_stream = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .load("source_path") \
    .writeStream \
    .option("checkpointLocation", "checkpoint_path") \
    .toTable("my_catalog.my_schema.my_table")

 

LR

View solution in original post