<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic autoloader stops working if I do not drop table each time in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/autoloader-stops-working-if-i-do-not-drop-table-each-time/m-p/59242#M31347</link>
    <description>&lt;P&gt;I first create a catalog and schema and ingest some data into it as follows:&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;catalogName &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;'neo'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;schemaName &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;'indicators'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;locationPath &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;'s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/99999999xxx'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;sqlContext&lt;/SPAN&gt;&lt;SPAN&gt;.sql&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;CREATE&lt;/SPAN&gt; &lt;SPAN&gt;CATALOG&lt;/SPAN&gt; &lt;SPAN&gt;IF&lt;/SPAN&gt; &lt;SPAN&gt;NOT&lt;/SPAN&gt; &lt;SPAN&gt;EXISTS&lt;/SPAN&gt;&lt;SPAN&gt; {catalogName} MANAGED &lt;/SPAN&gt;&lt;SPAN&gt;LOCATION&lt;/SPAN&gt; &lt;SPAN&gt;'{locationPath}'&lt;/SPAN&gt;&lt;SPAN&gt; COMMENT &lt;/SPAN&gt;&lt;SPAN&gt;'neo'"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;sqlContext&lt;/SPAN&gt;&lt;SPAN&gt;.sql&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;CREATE&lt;/SPAN&gt; &lt;SPAN&gt;SCHEMA&lt;/SPAN&gt; &lt;SPAN&gt;IF&lt;/SPAN&gt; &lt;SPAN&gt;NOT&lt;/SPAN&gt; &lt;SPAN&gt;EXISTS&lt;/SPAN&gt;&lt;SPAN&gt; {catalogName}.{schemaName}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;sqlContext&lt;/SPAN&gt;&lt;SPAN&gt;.sql&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;use&lt;/SPAN&gt;&lt;SPAN&gt; {catalogName}.{schemaName}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.conf.&lt;/SPAN&gt;&lt;SPAN&gt;set&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"ts.catalogName"&lt;/SPAN&gt;&lt;SPAN&gt;, catalogName)&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;###&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; spark.read.&lt;/SPAN&gt;&lt;SPAN&gt;csv&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/99999999xxx/uploads/neo_daily_indicators_2024-01-09_20.38.45.csv"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;header&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;inferSchema&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;df.write.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"mergeSchema"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;catalogName&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;schemaName&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;.neo_indicators'&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;######&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;I then create an autoloader to ingest new files that arrive in the bucket, as follows:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Import functions&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; col, current_timestamp&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.types &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; StructType, StructField, StringType, DateType&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;custom_schema &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;StructType&lt;/SPAN&gt;&lt;SPAN&gt;([&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;StructField&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Date"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;DateType&lt;/SPAN&gt;&lt;SPAN&gt;(), &lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# Add other fields as needed&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;])&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.conf.&lt;/SPAN&gt;&lt;SPAN&gt;set&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"spark.databricks.delta.schema.autoMerge.enabled"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Define variables used in code below&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;bucket_path &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/999999xxx/uploads/indicators/neo/"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;table_name &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"neo.indicators.neo_indicators"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;checkpoint_path &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"/tmp/_checkpoint/neo_indicators"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Clear out data from previous demo execution&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;#spark.sql(f"DROP TABLE IF EXISTS {table_name}")&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dbutils.fs.&lt;/SPAN&gt;&lt;SPAN&gt;rm&lt;/SPAN&gt;&lt;SPAN&gt;(checkpoint_path, &lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Configure Auto Loader to ingest JSON data to a Delta table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;(spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"csv"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;schema&lt;/SPAN&gt;&lt;SPAN&gt;(custom_schema) &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;# Apply custom schema&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.schemaLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, checkpoint_path)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;load&lt;/SPAN&gt;&lt;SPAN&gt;(bucket_path)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;select&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"*"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"_metadata.file_path"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"source_file"&lt;/SPAN&gt;&lt;SPAN&gt;), &lt;/SPAN&gt;&lt;SPAN&gt;current_timestamp&lt;/SPAN&gt;&lt;SPAN&gt;().&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"processing_time"&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .writeStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, checkpoint_path)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"mergeSchema"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;# Enable schema migration&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;#.option("overwriteSchema", "true") &amp;nbsp;# overwrite schema&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;) &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;# Specify Delta format&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;trigger&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;availableNow&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;toTable&lt;/SPAN&gt;&lt;SPAN&gt;(table_name, &lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;#append mode&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;##########&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;it works fine if it is exactly as above, however, if I uncomment the line where the table is being dropped, I no longer get any files ingested after I copy them there and run the autoloader.&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;As I will be ingesting files constantly, I do not want to drop the table and recreate it each time, as it will take longer and longer each time, as more data will be there in the bucket which has to be read and ingested. The reason I have "append" in the code is exactly for this reason.&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Thanks&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Sun, 04 Feb 2024 21:26:32 GMT</pubDate>
    <dc:creator>OLAPTrader</dc:creator>
    <dc:date>2024-02-04T21:26:32Z</dc:date>
    <item>
      <title>autoloader stops working if I do not drop table each time</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-stops-working-if-i-do-not-drop-table-each-time/m-p/59242#M31347</link>
      <description>&lt;P&gt;I first create a catalog and schema and ingest some data into it as follows:&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;catalogName &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;'neo'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;schemaName &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;'indicators'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;locationPath &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;'s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/99999999xxx'&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;sqlContext&lt;/SPAN&gt;&lt;SPAN&gt;.sql&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;CREATE&lt;/SPAN&gt; &lt;SPAN&gt;CATALOG&lt;/SPAN&gt; &lt;SPAN&gt;IF&lt;/SPAN&gt; &lt;SPAN&gt;NOT&lt;/SPAN&gt; &lt;SPAN&gt;EXISTS&lt;/SPAN&gt;&lt;SPAN&gt; {catalogName} MANAGED &lt;/SPAN&gt;&lt;SPAN&gt;LOCATION&lt;/SPAN&gt; &lt;SPAN&gt;'{locationPath}'&lt;/SPAN&gt;&lt;SPAN&gt; COMMENT &lt;/SPAN&gt;&lt;SPAN&gt;'neo'"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;sqlContext&lt;/SPAN&gt;&lt;SPAN&gt;.sql&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;CREATE&lt;/SPAN&gt; &lt;SPAN&gt;SCHEMA&lt;/SPAN&gt; &lt;SPAN&gt;IF&lt;/SPAN&gt; &lt;SPAN&gt;NOT&lt;/SPAN&gt; &lt;SPAN&gt;EXISTS&lt;/SPAN&gt;&lt;SPAN&gt; {catalogName}.{schemaName}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;sqlContext&lt;/SPAN&gt;&lt;SPAN&gt;.sql&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;use&lt;/SPAN&gt;&lt;SPAN&gt; {catalogName}.{schemaName}&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.conf.&lt;/SPAN&gt;&lt;SPAN&gt;set&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"ts.catalogName"&lt;/SPAN&gt;&lt;SPAN&gt;, catalogName)&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;###&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; spark.read.&lt;/SPAN&gt;&lt;SPAN&gt;csv&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/99999999xxx/uploads/neo_daily_indicators_2024-01-09_20.38.45.csv"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;header&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;inferSchema&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;df.write.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"mergeSchema"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;saveAsTable&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;catalogName&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;schemaName&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;.neo_indicators'&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;######&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;I then create an autoloader to ingest new files that arrive in the bucket, as follows:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Import functions&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; col, current_timestamp&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.types &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; StructType, StructField, StringType, DateType&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;custom_schema &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;StructType&lt;/SPAN&gt;&lt;SPAN&gt;([&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;StructField&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Date"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;DateType&lt;/SPAN&gt;&lt;SPAN&gt;(), &lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# Add other fields as needed&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;])&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;spark.conf.&lt;/SPAN&gt;&lt;SPAN&gt;set&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"spark.databricks.delta.schema.autoMerge.enabled"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Define variables used in code below&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;bucket_path &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;"s3a://databricks-workspace-olaptrader-stack-1-bucket/unity-catalog/999999xxx/uploads/indicators/neo/"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;table_name &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"neo.indicators.neo_indicators"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;checkpoint_path &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt; &lt;SPAN&gt;"/tmp/_checkpoint/neo_indicators"&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Clear out data from previous demo execution&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;#spark.sql(f"DROP TABLE IF EXISTS {table_name}")&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;dbutils.fs.&lt;/SPAN&gt;&lt;SPAN&gt;rm&lt;/SPAN&gt;&lt;SPAN&gt;(checkpoint_path, &lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;# Configure Auto Loader to ingest JSON data to a Delta table&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;(spark.readStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"csv"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;schema&lt;/SPAN&gt;&lt;SPAN&gt;(custom_schema) &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;# Apply custom schema&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.schemaLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, checkpoint_path)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;load&lt;/SPAN&gt;&lt;SPAN&gt;(bucket_path)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;select&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"*"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"_metadata.file_path"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"source_file"&lt;/SPAN&gt;&lt;SPAN&gt;), &lt;/SPAN&gt;&lt;SPAN&gt;current_timestamp&lt;/SPAN&gt;&lt;SPAN&gt;().&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"processing_time"&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .writeStream&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"checkpointLocation"&lt;/SPAN&gt;&lt;SPAN&gt;, checkpoint_path)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"mergeSchema"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;) &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;# Enable schema migration&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;#.option("overwriteSchema", "true") &amp;nbsp;# overwrite schema&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;) &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;# Specify Delta format&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;trigger&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;availableNow&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;toTable&lt;/SPAN&gt;&lt;SPAN&gt;(table_name, &lt;/SPAN&gt;&lt;SPAN&gt;mode&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"append"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;#append mode&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;##########&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;it works fine if it is exactly as above, however, if I uncomment the line where the table is being dropped, I no longer get any files ingested after I copy them there and run the autoloader.&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;As I will be ingesting files constantly, I do not want to drop the table and recreate it each time, as it will take longer and longer each time, as more data will be there in the bucket which has to be read and ingested. The reason I have "append" in the code is exactly for this reason.&amp;nbsp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Thanks&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Sun, 04 Feb 2024 21:26:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-stops-working-if-i-do-not-drop-table-each-time/m-p/59242#M31347</guid>
      <dc:creator>OLAPTrader</dc:creator>
      <dc:date>2024-02-04T21:26:32Z</dc:date>
    </item>
    <item>
      <title>Re: autoloader stops working if I do not drop table each time</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-stops-working-if-i-do-not-drop-table-each-time/m-p/59284#M31363</link>
      <description>&lt;P&gt;Thank you for your reply. When I uncomment the line which drops the table, this is when it actually works. I get the most recent csv ingested. (plus older ones I assume). This is not what I want to do. so, in short, when I do not drop the table, nothing works (no errors but no recent data ingested).&amp;nbsp;I think you may have misread the question.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 05 Feb 2024 13:27:28 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-stops-working-if-i-do-not-drop-table-each-time/m-p/59284#M31363</guid>
      <dc:creator>OLAPTrader</dc:creator>
      <dc:date>2024-02-05T13:27:28Z</dc:date>
    </item>
    <item>
      <title>Re: autoloader stops working if I do not drop table each time</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-stops-working-if-i-do-not-drop-table-each-time/m-p/59479#M31421</link>
      <description>&lt;P&gt;Thanks, I am not dropping the table and I do use append mode. However, the only time the autoloader works if if I do indeed drop the table. I was doing that just for testing. When I comment that line, I never get new files ingested into the table.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 06 Feb 2024 15:39:12 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-stops-working-if-i-do-not-drop-table-each-time/m-p/59479#M31421</guid>
      <dc:creator>OLAPTrader</dc:creator>
      <dc:date>2024-02-06T15:39:12Z</dc:date>
    </item>
    <item>
      <title>Re: autoloader stops working if I do not drop table each time</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-stops-working-if-i-do-not-drop-table-each-time/m-p/59536#M31438</link>
      <description>&lt;P&gt;My issue was due to the fact that I have over 300 columns and due to datatype mismatches, the rows were actually written to the table, but values were all null. That's why I didnt get any errors. I am doing manual datatype mapping now and I am able to get autoloader working and appending and am not dropping the table. Thanks everyone.&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 07 Feb 2024 03:29:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-stops-working-if-i-do-not-drop-table-each-time/m-p/59536#M31438</guid>
      <dc:creator>OLAPTrader</dc:creator>
      <dc:date>2024-02-07T03:29:42Z</dc:date>
    </item>
  </channel>
</rss>

