Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-07-2025 08:49 PM
Hi @yit
This is a classic timing and metadata synchronization issue between Delta table creation and Autoloader initialization.
Here's what's happening and how to fix it.
The error occurs because:
Delta table creation writes initial metadata to the _delta_log directory
Autoloader schema inference tries to write to the same metadata location almost simultaneously
ADLS eventual consistency can cause conflicts when operations happen too quickly
Metastore synchronization may not be complete when Autoloader starts.
Add Explicit Wait/Validation:
import time
from delta.tables import DeltaTable
def create_table_and_wait(table_name, table_location):
"""Create table and ensure it's ready for Autoloader"""
# Create the external Delta table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
-- your schema here
) USING DELTA
LOCATION '{table_location}'
""")
# Wait for table creation to complete
time.sleep(5)
# Validate table is accessible and metadata is ready
max_retries = 10
for attempt in range(max_retries):
try:
# Try to access the Delta table metadata
delta_table = DeltaTable.forPath(spark, table_location)
table_version = delta_table.history(1).collect()[0].version
print(f"Table ready at version {table_version}")
break
except Exception as e:
if attempt < max_retries - 1:
print(f"Waiting for table metadata... attempt {attempt + 1}")
time.sleep(2)
else:
raise Exception(f"Table not ready after {max_retries} attempts: {e}")
# Additional validation - ensure directory structure exists
try:
dbutils.fs.ls(f"{table_location}/_delta_log/")
print("Delta log directory confirmed")
except:
time.sleep(3) # Additional wait if needed
# Usage
create_table_and_wait("my_catalog.my_schema.my_table", "abfss://container@storage.dfs.core.windows.net/my-path/")
# Now start Autoloader
autoloader_stream = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.load("source_path") \
.writeStream \
.option("checkpointLocation", "checkpoint_path") \
.toTable("my_catalog.my_schema.my_table")
LR