# Import functions
from pyspark.sql.functions import col, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DateType
custom_schema = StructType([
StructField("Date", DateType(), True),
# Add other fields as needed
])
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
# Define variables used in code below
bucket_path = f"s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/999999xxx/uploads/indicators/neo/"
table_name = "neo.indicators.neo_indicators"
checkpoint_path = "/tmp/_checkpoint/neo_indicators"
# Clear out data from previous demo execution
#spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.schema(custom_schema) # Apply custom schema
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(bucket_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true") # Enable schema migration
#.option("overwriteSchema", "true") # overwrite schema
.format("delta") # Specify Delta format
.trigger(availableNow=True)
.toTable(table_name, mode="append")) #append mode
##########
it works fine if it is exactly as above, however, if I uncomment the line where the table is being dropped, I no longer get any files ingested after I copy them there and run the autoloader.
As I will be ingesting files constantly, I do not want to drop the table and recreate it each time, as it will take longer and longer each time, as more data will be there in the bucket which has to be read and ingested. The reason I have "append" in the code is exactly for this reason.
Thanks