cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

autoloader stops working if I do not drop table each time

OLAPTrader
New Contributor III

I first create a catalog and schema and ingest some data into it as follows:

catalogName = 'neo'
schemaName ='indicators'
locationPath = 's3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/99999999xxx'
sqlContext.sql(f"CREATE CATALOG IF NOT EXISTS {catalogName} MANAGED LOCATION '{locationPath}' COMMENT 'neo'");
sqlContext.sql(f"CREATE SCHEMA IF NOT EXISTS {catalogName}.{schemaName}");
sqlContext.sql(f"use {catalogName}.{schemaName}");
spark.conf.set("ts.catalogName", catalogName);
###
df = spark.read.csv("s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/99999999xxx/uploads/neo_daily_indicators_2024-01-09_20.38.45.csv", header=True, inferSchema=True)
df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(f'{catalogName}.{schemaName}.neo_indicators')
 
######
I then create an autoloader to ingest new files that arrive in the bucket, as follows:
# Import functions
from pyspark.sql.functions import col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DateType
custom_schema = StructType([
    StructField("Date", DateType(), True),
    # Add other fields as needed
])
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Define variables used in code below
bucket_path = f"s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/999999xxx/uploads/indicators/neo/"
table_name = "neo.indicators.neo_indicators"
checkpoint_path = "/tmp/_checkpoint/neo_indicators"

# Clear out data from previous demo execution
#spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .schema(custom_schema)  # Apply custom schema
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(bucket_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .option("mergeSchema", "true")  # Enable schema migration
  #.option("overwriteSchema", "true")  # overwrite schema
  .format("delta")  # Specify Delta format
  .trigger(availableNow=True)
  .toTable(table_name, mode="append")) #append mode
 
##########
it works fine if it is exactly as above, however, if I uncomment the line where the table is being dropped, I no longer get any files ingested after I copy them there and run the autoloader. 
As I will be ingesting files constantly, I do not want to drop the table and recreate it each time, as it will take longer and longer each time, as more data will be there in the bucket which has to be read and ingested. The reason I have "append" in the code is exactly for this reason. 
 
Thanks
 
 
1 ACCEPTED SOLUTION

Accepted Solutions

OLAPTrader
New Contributor III

My issue was due to the fact that I have over 300 columns and due to datatype mismatches, the rows were actually written to the table, but values were all null. That's why I didnt get any errors. I am doing manual datatype mapping now and I am able to get autoloader working and appending and am not dropping the table. Thanks everyone. 

View solution in original post

5 REPLIES 5

Kaniz
Community Manager
Community Manager

Hi @OLAPTrader, The issue you’re experiencing is likely because when you drop a table in Spark, it deletes the associated metadata. This includes the schema and any other information Spark uses to read the data. When you then try to append new data to the table, Spark doesn’t know how to interpret it because the schema information is missing.

 

In your case, you’re using the append mode in your write operation, which means Spark is trying to append the new data to the existing table. If the table doesn’t exist (because it was dropped), Spark will try to create a new table. 

 

However, since the schema information is missing, it doesn’t know how to make the table correctly; thus, no data is ingested.

 

To resolve this issue, you could consider the following options:

 

Avoid dropping the table: Since you’re constantly ingesting new data, there’s no need to drop the table. Just append the new data to the existing table.

Recreate the schema: If you need to drop the table, recreate the schema before ingesting new data. You can do this by defining the schema in your code and applying it when you create the table.

Use overwrite mode cautiously: This mode will overwrite the existing data in the table with the new data. Be careful with this mode, as it will delete all existing data in the table. It’s generally not recommended for scenarios where you want to keep historical data.

 

 

OLAPTrader
New Contributor III

Thank you for your reply. When I uncomment the line which drops the table, this is when it actually works. I get the most recent csv ingested. (plus older ones I assume). This is not what I want to do. so, in short, when I do not drop the table, nothing works (no errors but no recent data ingested). I think you may have misread the question. 

Kaniz
Community Manager
Community Manager

Hi @OLAPTrader, Thank you for sharing the details! 

 

When you drop the table and recreate it each time, it indeed has some downsides, especially as the data volume grows. The process of recreating the table involves reading all the existing data from the bucket and ingesting it again, which can become time-consuming and resource-intensive.

 

To avoid this, you can modify your approach. Instead of dropping and recreating the table, consider the following steps:

 

Initial Setup:

  • Create the table once with the initial data.
  • Set up the autoloader as you’ve described.

Incremental Data Ingestion:

  • When new files arrive in the bucket, don’t drop the table.
  • Use the append mode to add new data to the existing table.
  • Delta Lake will handle schema merging and efficiently add only the new data.

Periodic Optimization:

  • Periodically optimize the table using Delta Lake’s VACUUM command.
  • This command reclaims space by removing old versions of data files.
  • You can set up a schedule (e.g., daily or weekly) to run the VACUUM operation.

 

Feel free to adjust the frequency of the VACUUM operation based on your specific use case and data growth. If you have any further questions or need additional assistance, feel free to ask! 😊

OLAPTrader
New Contributor III

Thanks, I am not dropping the table and I do use append mode. However, the only time the autoloader works if if I do indeed drop the table. I was doing that just for testing. When I comment that line, I never get new files ingested into the table. 

OLAPTrader
New Contributor III

My issue was due to the fact that I have over 300 columns and due to datatype mismatches, the rows were actually written to the table, but values were all null. That's why I didnt get any errors. I am doing manual datatype mapping now and I am able to get autoloader working and appending and am not dropping the table. Thanks everyone. 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!