cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

autoloader stops working if I do not drop table each time

OLAPTrader
New Contributor III

I first create a catalog and schema and ingest some data into it as follows:

catalogName = 'neo'
schemaName ='indicators'
locationPath = 's3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/99999999xxx'
sqlContext.sql(f"CREATE CATALOG IF NOT EXISTS {catalogName} MANAGED LOCATION '{locationPath}' COMMENT 'neo'");
sqlContext.sql(f"CREATE SCHEMA IF NOT EXISTS {catalogName}.{schemaName}");
sqlContext.sql(f"use {catalogName}.{schemaName}");
spark.conf.set("ts.catalogName", catalogName);
###
df = spark.read.csv("s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/99999999xxx/uploads/neo_daily_indicators_2024-01-09_20.38.45.csv", header=True, inferSchema=True)
df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(f'{catalogName}.{schemaName}.neo_indicators')
 
######
I then create an autoloader to ingest new files that arrive in the bucket, as follows:
# Import functions
from pyspark.sql.functions import col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DateType
custom_schema = StructType([
    StructField("Date", DateType(), True),
    # Add other fields as needed
])
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Define variables used in code below
bucket_path = f"s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/999999xxx/uploads/indicators/neo/"
table_name = "neo.indicators.neo_indicators"
checkpoint_path = "/tmp/_checkpoint/neo_indicators"

# Clear out data from previous demo execution
#spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .schema(custom_schema)  # Apply custom schema
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(bucket_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .option("mergeSchema", "true")  # Enable schema migration
  #.option("overwriteSchema", "true")  # overwrite schema
  .format("delta")  # Specify Delta format
  .trigger(availableNow=True)
  .toTable(table_name, mode="append")) #append mode
 
##########
it works fine if it is exactly as above, however, if I uncomment the line where the table is being dropped, I no longer get any files ingested after I copy them there and run the autoloader. 
As I will be ingesting files constantly, I do not want to drop the table and recreate it each time, as it will take longer and longer each time, as more data will be there in the bucket which has to be read and ingested. The reason I have "append" in the code is exactly for this reason. 
 
Thanks
 
 
1 ACCEPTED SOLUTION

Accepted Solutions

OLAPTrader
New Contributor III

My issue was due to the fact that I have over 300 columns and due to datatype mismatches, the rows were actually written to the table, but values were all null. That's why I didnt get any errors. I am doing manual datatype mapping now and I am able to get autoloader working and appending and am not dropping the table. Thanks everyone. 

View solution in original post

3 REPLIES 3

Thank you for your reply. When I uncomment the line which drops the table, this is when it actually works. I get the most recent csv ingested. (plus older ones I assume). This is not what I want to do. so, in short, when I do not drop the table, nothing works (no errors but no recent data ingested). I think you may have misread the question. 

Thanks, I am not dropping the table and I do use append mode. However, the only time the autoloader works if if I do indeed drop the table. I was doing that just for testing. When I comment that line, I never get new files ingested into the table. 

OLAPTrader
New Contributor III

My issue was due to the fact that I have over 300 columns and due to datatype mismatches, the rows were actually written to the table, but values were all null. That's why I didnt get any errors. I am doing manual datatype mapping now and I am able to get autoloader working and appending and am not dropping the table. Thanks everyone. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group