I am trying to following the documentation here:
https://learn.microsoft.com/en-us/azure/databricks/getting-started/etl-quick-start
My code looks like:
(spark.readStream
.format("cloudFiles")
.option("header", "true")
#.option("cloudFiles.partitionColumns", "date, hour")
.option("cloudFiles.format", "csv")
.option("cloudFiles.maxBytesPerTrigger", "10m")
.option("cloudFiles.schemaHints", SCHEMA_HINT)
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load(file_path)
.withColumn('source_file', input_file_name())
.withColumn('processing_time', current_timestamp())
.withColumnRenamed("date","timestamp")
.withColumnRenamed("FW_Version","fw_version_1")
.withColumnRenamed('fw_version','fw_version_2') # https://kb.databricks.com/en_US/sql/dupe-column-in-metadata
.withColumnRenamed('Time_since_last_clear_[Min]', 'Time_since_last_clear_min') # delta does not like column names with brackets
.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.option("path", delta_path)
.trigger(availableNow=True)
.toTable(table_name))
(I have commented out the partition option because one of my original columns has the same name as the partition so it is overwritten. Could not find a workaround.)
However, it does not work.
I get the following error:
AnalysisException: Incompatible format detected.
You are trying to write to `s3://nbu-ml/projects/rca/msft/dsm09collectx/delta` using Databricks Delta, but there is no
transaction log present. Check the upstream job to make sure that it is writing
using format("delta") and that you are trying to write to the table base path.
To disable this check, SET spark.databricks.delta.formatCheck.enabled=false
To learn more about Delta, see https://docs.databricks.com/delta/index.html