<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: databricks autoloader not updating table immediately in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/databricks-autoloader-not-updating-table-immediately/m-p/18859#M12576</link>
    <description>&lt;PRE&gt;&lt;CODE&gt;# Databricks notebook source
from pyspark.sql.functions import col, explode, upper, lower, lit, floor, coalesce, to_date, date_format, to_timestamp, lit, cast
from pyspark.sql.types import StructType
import json
from azure.storage.blob import BlobServiceClient, ContentSettings
from datetime import datetime
&amp;nbsp;
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
spark.conf.set("fs.azure.account.auth.type.XXXXXX.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.XXXXXX.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.XXXXXX.dfs.core.windows.net", "")
&amp;nbsp;
spark.conf.set("fs.azure.account.auth.type.YYYYYYY.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.YYYYYYY.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.YYYYYYY.dfs.core.windows.net", "")
&amp;nbsp;
&amp;nbsp;
schema_string = "{\"fields\":[{\"metadata\":{},\"name\":\"DWU_LIMIT\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"DWU_USED\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"cost\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"interval\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"resource_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"rg_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"sub_id\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}"
&amp;nbsp;
schemaFromJson = StructType.fromJson(json.loads(schema_string))
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC DROP VIEW IF EXISTS v_synapse_usage_final;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_limit;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_used;
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the start
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# ##print schema
# files_to_process = 'abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/'
&amp;nbsp;
# df_json_from_adls = spark.read\
# .option("recursiveFileLookup", "True")\
# .json(files_to_process)
&amp;nbsp;
# print(df_json_from_adls.schema.json())
&amp;nbsp;
# # #df_json_from_adls.write.saveAsTable('dnu_synpase_tmp')
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the start
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_dwu_limit = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "JSON") \
    .schema(schemaFromJson) \
    .load("abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/")\
    .writeStream \
    .format("delta")\
    .option("checkpointLocation", "abfss://adb-delta-synapse-usage-api@YYYYYYY.dfs.core.windows.net/checkpoint_synapse_usage_api_landing/") \
    .trigger(availableNow=True)\
    .toTable("platform_dnu.synapse_usage_api_landing")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the end
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_dwu_limit_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_LIMIT FROM platform_dnu.synapse_usage_api_landing")
df_dwu_used_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_USED FROM platform_dnu.synapse_usage_api_landing")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_dwu_limit = df_dwu_limit_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_LIMIT' type", "explode(DWU_LIMIT) DWU_LIMIT")\
    .selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_LIMIT['average'] as AVERAGE", "DWU_LIMIT['maximum'] as MAXIMUM", "DWU_LIMIT['minimum'] as MINIMUM", "to_date(DWU_LIMIT['timestamp']) as date", "to_timestamp(DWU_LIMIT['timestamp']) as timestamp")
&amp;nbsp;
df_dwu_used = df_dwu_used_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_USED' type", "explode(DWU_USED) DWU_USED")\
    .selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_USED['average'] as AVERAGE", "DWU_USED['maximum'] as MAXIMUM", "DWU_USED['minimum'] as MINIMUM", "to_date(DWU_USED['timestamp']) as date", "to_timestamp(DWU_USED['timestamp']) as timestamp")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_dwu_limit.createOrReplaceTempView("v_df_dwu_limit")
df_dwu_used.createOrReplaceTempView("v_df_dwu_used")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC 
# MAGIC CREATE OR REPLACE TEMPORARY VIEW v_synapse_usage_final
# MAGIC as
# MAGIC select
# MAGIC coalesce(dl.sub_id, du.sub_id) as sub_id,
# MAGIC coalesce(dl.rg_name, du.rg_name) as rg_name,
# MAGIC coalesce(dl.resource_name, du.resource_name) as resource_name,
# MAGIC coalesce(dl.id, du.id) as `id`,
# MAGIC coalesce(dl.`cost`, du.`cost`) as `cost`,
# MAGIC coalesce(dl.`interval`, du.`interval`) as `interval`,
# MAGIC coalesce(dl.date, du.date) as `date`,
# MAGIC coalesce(dl.timestamp, du.timestamp) as `timestamp`,
# MAGIC du.AVERAGE as DWU_USED_AVERAGE,
# MAGIC du.MAXIMUM as DWU_USED_MAXIMUM,
# MAGIC du.MINIMUM as DWU_USED_MINIMUM,
# MAGIC dl.AVERAGE as DWU_LIMIT_AVERAGE,
# MAGIC dl.MAXIMUM as DWU_LIMIT_MAXIMUM,
# MAGIC dl.MINIMUM as DWU_LIMIT_MINIMUM
# MAGIC from v_df_dwu_limit dl join v_df_dwu_used du
# MAGIC on
# MAGIC dl.id = du.id
# MAGIC and dl.`date` = du.`date`
# MAGIC and dl.`timestamp` = du.`timestamp`
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from v_synapse_usage_final;
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_synapse_usage_final = spark.sql("select * from v_synapse_usage_final")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
from delta.tables import DeltaTable
&amp;nbsp;
synapse_usage_upsert = DeltaTable.forName(spark, 'platform_dnu.synapse_usage_api')    # Hive metastore-based tables
&amp;nbsp;
synapse_usage_upsert.alias("t").merge(
    df_synapse_usage_final.alias("s"),
    "s.id = t.id and s.interval = t.interval and s.timestamp = t.timestamp") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()
&amp;nbsp;
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the end
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# %sql
# select * from platform_dnu.synapse_usage_api_landing where
# `id` = '/subscriptions/3a683d84-be08-4356-bb14-3b62df1bad55/resourcegroups/diageo-analytics-prod-rg-funclib/providers/microsoft.sql/servers/azeunfunclibp001/databases/diageo-analytics-prod-asa-funclib-prod01';&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;complete code.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Count will not matter, as each session count changes. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;issue is, line 52 and 69 should not have same count. when autoloader is showing ingested files.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;and if i run line 69 after 1 min, and not immediately, it does shows me updated count.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;but i do not want to implement &lt;B&gt;wait(60)&lt;/B&gt; login.&lt;/P&gt;</description>
    <pubDate>Sun, 04 Dec 2022 01:08:44 GMT</pubDate>
    <dc:creator>rakeshprasad1</dc:creator>
    <dc:date>2022-12-04T01:08:44Z</dc:date>
    <item>
      <title>databricks autoloader not updating table immediately</title>
      <link>https://community.databricks.com/t5/data-engineering/databricks-autoloader-not-updating-table-immediately/m-p/18857#M12574</link>
      <description>&lt;P&gt;I have a simple autoloader job which looks like this&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;df_dwu_limit = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "JSON") \
    .schema(schemaFromJson) \
    .load("abfss://synapse-usage@xxxxx.dfs.core.windows.net/synapse-usage/")\
    .writeStream \
    .format("delta")\
    .option("checkpointLocation", "abfss://adb-delta-synapse-usage-api@xxxxxx.dfs.core.windows.net/checkpoint_synapse_usage_api_landing/") \
    .trigger(availableNow=True)\
    .toTable("platform_dnu.synapse_usage_api_landing")&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;very next line i am using delta table "" to further processing.&lt;/P&gt;&lt;P&gt;i also have count(*) query before and after auto loader. count doesn't change. Although i can see in auto loader profile, record has been written.&lt;/P&gt;&lt;P&gt;If I wait for 1 min or so, and run count(*) query, i can see updated record. how to solve this issue?&lt;/P&gt;&lt;P&gt;o/p of auto loader for one particular session&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;{
  "id" : "cb9a28b4-c5b4-4865-bc65-b3ca5efd2537",
  "runId" : "64c2afd9-ad69-4e9a-97bf-d6fa2794931a",
  "name" : null,
  "timestamp" : "2022-12-03T04:44:17.591Z",
  "batchId" : 7,
  "numInputRows" : 27,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.7879760688749453,
  "durationMs" : {
    "addBatch" : 3005,
    "commitOffsets" : 146,
    "getBatch" : 12,
    "latestOffset" : 30380,
    "queryPlanning" : 61,
    "triggerExecution" : 34259,
    "walCommit" : 222
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "CloudFilesSource[abfss://synapse-usage@platformbilling.dfs.core.windows.net/synapse-usage/]",
    "startOffset" : {
      "seqNum" : 2534,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1669823987701,
      "lastBackfillFinishTimeMs" : 1669823991340
    },
    "endOffset" : {
      "seqNum" : 2562,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1669823987701,
      "lastBackfillFinishTimeMs" : 1669823991340
    },
    "latestOffset" : null,
    "numInputRows" : 27,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.7879760688749453,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[abfss://adb-delta-synapse-usage-api@platbillingdatabricks.dfs.core.windows.net/delta/synapse_usage_api_landing]",
    "numOutputRows" : -1
  }
}&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;DDL for Delta Table. (img attached)&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="auto-loader issue"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/1052i26CF0691A2CCFEF8/image-size/large?v=v2&amp;amp;px=999" role="button" title="auto-loader issue" alt="auto-loader issue" /&gt;&lt;/span&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sat, 03 Dec 2022 13:27:17 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databricks-autoloader-not-updating-table-immediately/m-p/18857#M12574</guid>
      <dc:creator>rakeshprasad1</dc:creator>
      <dc:date>2022-12-03T13:27:17Z</dc:date>
    </item>
    <item>
      <title>Re: databricks autoloader not updating table immediately</title>
      <link>https://community.databricks.com/t5/data-engineering/databricks-autoloader-not-updating-table-immediately/m-p/18858#M12575</link>
      <description>&lt;P&gt;Can you share the whole code with the counts, which you mentioned?&lt;/P&gt;</description>
      <pubDate>Sat, 03 Dec 2022 16:08:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databricks-autoloader-not-updating-table-immediately/m-p/18858#M12575</guid>
      <dc:creator>Hubert-Dudek</dc:creator>
      <dc:date>2022-12-03T16:08:33Z</dc:date>
    </item>
    <item>
      <title>Re: databricks autoloader not updating table immediately</title>
      <link>https://community.databricks.com/t5/data-engineering/databricks-autoloader-not-updating-table-immediately/m-p/18859#M12576</link>
      <description>&lt;PRE&gt;&lt;CODE&gt;# Databricks notebook source
from pyspark.sql.functions import col, explode, upper, lower, lit, floor, coalesce, to_date, date_format, to_timestamp, lit, cast
from pyspark.sql.types import StructType
import json
from azure.storage.blob import BlobServiceClient, ContentSettings
from datetime import datetime
&amp;nbsp;
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
spark.conf.set("fs.azure.account.auth.type.XXXXXX.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.XXXXXX.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.XXXXXX.dfs.core.windows.net", "")
&amp;nbsp;
spark.conf.set("fs.azure.account.auth.type.YYYYYYY.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.YYYYYYY.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.YYYYYYY.dfs.core.windows.net", "")
&amp;nbsp;
&amp;nbsp;
schema_string = "{\"fields\":[{\"metadata\":{},\"name\":\"DWU_LIMIT\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"DWU_USED\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"cost\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"interval\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"resource_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"rg_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"sub_id\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}"
&amp;nbsp;
schemaFromJson = StructType.fromJson(json.loads(schema_string))
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC DROP VIEW IF EXISTS v_synapse_usage_final;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_limit;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_used;
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the start
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# ##print schema
# files_to_process = 'abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/'
&amp;nbsp;
# df_json_from_adls = spark.read\
# .option("recursiveFileLookup", "True")\
# .json(files_to_process)
&amp;nbsp;
# print(df_json_from_adls.schema.json())
&amp;nbsp;
# # #df_json_from_adls.write.saveAsTable('dnu_synpase_tmp')
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the start
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_dwu_limit = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "JSON") \
    .schema(schemaFromJson) \
    .load("abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/")\
    .writeStream \
    .format("delta")\
    .option("checkpointLocation", "abfss://adb-delta-synapse-usage-api@YYYYYYY.dfs.core.windows.net/checkpoint_synapse_usage_api_landing/") \
    .trigger(availableNow=True)\
    .toTable("platform_dnu.synapse_usage_api_landing")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the end
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_dwu_limit_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_LIMIT FROM platform_dnu.synapse_usage_api_landing")
df_dwu_used_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_USED FROM platform_dnu.synapse_usage_api_landing")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_dwu_limit = df_dwu_limit_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_LIMIT' type", "explode(DWU_LIMIT) DWU_LIMIT")\
    .selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_LIMIT['average'] as AVERAGE", "DWU_LIMIT['maximum'] as MAXIMUM", "DWU_LIMIT['minimum'] as MINIMUM", "to_date(DWU_LIMIT['timestamp']) as date", "to_timestamp(DWU_LIMIT['timestamp']) as timestamp")
&amp;nbsp;
df_dwu_used = df_dwu_used_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_USED' type", "explode(DWU_USED) DWU_USED")\
    .selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_USED['average'] as AVERAGE", "DWU_USED['maximum'] as MAXIMUM", "DWU_USED['minimum'] as MINIMUM", "to_date(DWU_USED['timestamp']) as date", "to_timestamp(DWU_USED['timestamp']) as timestamp")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_dwu_limit.createOrReplaceTempView("v_df_dwu_limit")
df_dwu_used.createOrReplaceTempView("v_df_dwu_used")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC 
# MAGIC CREATE OR REPLACE TEMPORARY VIEW v_synapse_usage_final
# MAGIC as
# MAGIC select
# MAGIC coalesce(dl.sub_id, du.sub_id) as sub_id,
# MAGIC coalesce(dl.rg_name, du.rg_name) as rg_name,
# MAGIC coalesce(dl.resource_name, du.resource_name) as resource_name,
# MAGIC coalesce(dl.id, du.id) as `id`,
# MAGIC coalesce(dl.`cost`, du.`cost`) as `cost`,
# MAGIC coalesce(dl.`interval`, du.`interval`) as `interval`,
# MAGIC coalesce(dl.date, du.date) as `date`,
# MAGIC coalesce(dl.timestamp, du.timestamp) as `timestamp`,
# MAGIC du.AVERAGE as DWU_USED_AVERAGE,
# MAGIC du.MAXIMUM as DWU_USED_MAXIMUM,
# MAGIC du.MINIMUM as DWU_USED_MINIMUM,
# MAGIC dl.AVERAGE as DWU_LIMIT_AVERAGE,
# MAGIC dl.MAXIMUM as DWU_LIMIT_MAXIMUM,
# MAGIC dl.MINIMUM as DWU_LIMIT_MINIMUM
# MAGIC from v_df_dwu_limit dl join v_df_dwu_used du
# MAGIC on
# MAGIC dl.id = du.id
# MAGIC and dl.`date` = du.`date`
# MAGIC and dl.`timestamp` = du.`timestamp`
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from v_synapse_usage_final;
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
df_synapse_usage_final = spark.sql("select * from v_synapse_usage_final")
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
from delta.tables import DeltaTable
&amp;nbsp;
synapse_usage_upsert = DeltaTable.forName(spark, 'platform_dnu.synapse_usage_api')    # Hive metastore-based tables
&amp;nbsp;
synapse_usage_upsert.alias("t").merge(
    df_synapse_usage_final.alias("s"),
    "s.id = t.id and s.interval = t.interval and s.timestamp = t.timestamp") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()
&amp;nbsp;
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the end
&amp;nbsp;
# COMMAND ----------
&amp;nbsp;
# %sql
# select * from platform_dnu.synapse_usage_api_landing where
# `id` = '/subscriptions/3a683d84-be08-4356-bb14-3b62df1bad55/resourcegroups/diageo-analytics-prod-rg-funclib/providers/microsoft.sql/servers/azeunfunclibp001/databases/diageo-analytics-prod-asa-funclib-prod01';&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;complete code.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Count will not matter, as each session count changes. &lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;issue is, line 52 and 69 should not have same count. when autoloader is showing ingested files.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;and if i run line 69 after 1 min, and not immediately, it does shows me updated count.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;but i do not want to implement &lt;B&gt;wait(60)&lt;/B&gt; login.&lt;/P&gt;</description>
      <pubDate>Sun, 04 Dec 2022 01:08:44 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databricks-autoloader-not-updating-table-immediately/m-p/18859#M12576</guid>
      <dc:creator>rakeshprasad1</dc:creator>
      <dc:date>2022-12-04T01:08:44Z</dc:date>
    </item>
    <item>
      <title>Re: databricks autoloader not updating table immediately</title>
      <link>https://community.databricks.com/t5/data-engineering/databricks-autoloader-not-updating-table-immediately/m-p/18860#M12577</link>
      <description>&lt;P&gt;Go to the driver logs and check the log4j window. Do you see any streaming query progress made output? Also, go to the "Spark UI" tab and select the "streaming" sub-tab. Do you see any micro-batches completed?&lt;/P&gt;</description>
      <pubDate>Wed, 25 Jan 2023 23:42:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databricks-autoloader-not-updating-table-immediately/m-p/18860#M12577</guid>
      <dc:creator>jose_gonzalez</dc:creator>
      <dc:date>2023-01-25T23:42:16Z</dc:date>
    </item>
  </channel>
</rss>

