cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

databricks autoloader not updating table immediately

rakeshprasad1
New Contributor III

I have a simple autoloader job which looks like this

df_dwu_limit = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "JSON") \
    .schema(schemaFromJson) \
    .load("abfss://synapse-usage@xxxxx.dfs.core.windows.net/synapse-usage/")\
    .writeStream \
    .format("delta")\
    .option("checkpointLocation", "abfss://adb-delta-synapse-usage-api@xxxxxx.dfs.core.windows.net/checkpoint_synapse_usage_api_landing/") \
    .trigger(availableNow=True)\
    .toTable("platform_dnu.synapse_usage_api_landing")

very next line i am using delta table "" to further processing.

i also have count(*) query before and after auto loader. count doesn't change. Although i can see in auto loader profile, record has been written.

If I wait for 1 min or so, and run count(*) query, i can see updated record. how to solve this issue?

o/p of auto loader for one particular session

{
  "id" : "cb9a28b4-c5b4-4865-bc65-b3ca5efd2537",
  "runId" : "64c2afd9-ad69-4e9a-97bf-d6fa2794931a",
  "name" : null,
  "timestamp" : "2022-12-03T04:44:17.591Z",
  "batchId" : 7,
  "numInputRows" : 27,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.7879760688749453,
  "durationMs" : {
    "addBatch" : 3005,
    "commitOffsets" : 146,
    "getBatch" : 12,
    "latestOffset" : 30380,
    "queryPlanning" : 61,
    "triggerExecution" : 34259,
    "walCommit" : 222
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "CloudFilesSource[abfss://synapse-usage@platformbilling.dfs.core.windows.net/synapse-usage/]",
    "startOffset" : {
      "seqNum" : 2534,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1669823987701,
      "lastBackfillFinishTimeMs" : 1669823991340
    },
    "endOffset" : {
      "seqNum" : 2562,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1669823987701,
      "lastBackfillFinishTimeMs" : 1669823991340
    },
    "latestOffset" : null,
    "numInputRows" : 27,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.7879760688749453,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[abfss://adb-delta-synapse-usage-api@platbillingdatabricks.dfs.core.windows.net/delta/synapse_usage_api_landing]",
    "numOutputRows" : -1
  }
}

DDL for Delta Table. (img attached)

auto-loader issue 

3 REPLIES 3

Hubert-Dudek
Esteemed Contributor III

Can you share the whole code with the counts, which you mentioned?

# Databricks notebook source
from pyspark.sql.functions import col, explode, upper, lower, lit, floor, coalesce, to_date, date_format, to_timestamp, lit, cast
from pyspark.sql.types import StructType
import json
from azure.storage.blob import BlobServiceClient, ContentSettings
from datetime import datetime
 
 
# COMMAND ----------
 
spark.conf.set("fs.azure.account.auth.type.XXXXXX.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.XXXXXX.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.XXXXXX.dfs.core.windows.net", "")
 
spark.conf.set("fs.azure.account.auth.type.YYYYYYY.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.YYYYYYY.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.YYYYYYY.dfs.core.windows.net", "")
 
 
schema_string = "{\"fields\":[{\"metadata\":{},\"name\":\"DWU_LIMIT\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"DWU_USED\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"cost\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"interval\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"resource_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"rg_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"sub_id\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}"
 
schemaFromJson = StructType.fromJson(json.loads(schema_string))
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC DROP VIEW IF EXISTS v_synapse_usage_final;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_limit;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_used;
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the start
 
# COMMAND ----------
 
# ##print schema
# files_to_process = 'abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/'
 
# df_json_from_adls = spark.read\
# .option("recursiveFileLookup", "True")\
# .json(files_to_process)
 
# print(df_json_from_adls.schema.json())
 
# # #df_json_from_adls.write.saveAsTable('dnu_synpase_tmp')
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the start
 
# COMMAND ----------
 
df_dwu_limit = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "JSON") \
    .schema(schemaFromJson) \
    .load("abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/")\
    .writeStream \
    .format("delta")\
    .option("checkpointLocation", "abfss://adb-delta-synapse-usage-api@YYYYYYY.dfs.core.windows.net/checkpoint_synapse_usage_api_landing/") \
    .trigger(availableNow=True)\
    .toTable("platform_dnu.synapse_usage_api_landing")
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the end
 
# COMMAND ----------
 
df_dwu_limit_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_LIMIT FROM platform_dnu.synapse_usage_api_landing")
df_dwu_used_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_USED FROM platform_dnu.synapse_usage_api_landing")
 
# COMMAND ----------
 
df_dwu_limit = df_dwu_limit_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_LIMIT' type", "explode(DWU_LIMIT) DWU_LIMIT")\
    .selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_LIMIT['average'] as AVERAGE", "DWU_LIMIT['maximum'] as MAXIMUM", "DWU_LIMIT['minimum'] as MINIMUM", "to_date(DWU_LIMIT['timestamp']) as date", "to_timestamp(DWU_LIMIT['timestamp']) as timestamp")
 
df_dwu_used = df_dwu_used_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_USED' type", "explode(DWU_USED) DWU_USED")\
    .selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_USED['average'] as AVERAGE", "DWU_USED['maximum'] as MAXIMUM", "DWU_USED['minimum'] as MINIMUM", "to_date(DWU_USED['timestamp']) as date", "to_timestamp(DWU_USED['timestamp']) as timestamp")
 
# COMMAND ----------
 
df_dwu_limit.createOrReplaceTempView("v_df_dwu_limit")
df_dwu_used.createOrReplaceTempView("v_df_dwu_used")
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC 
# MAGIC CREATE OR REPLACE TEMPORARY VIEW v_synapse_usage_final
# MAGIC as
# MAGIC select
# MAGIC coalesce(dl.sub_id, du.sub_id) as sub_id,
# MAGIC coalesce(dl.rg_name, du.rg_name) as rg_name,
# MAGIC coalesce(dl.resource_name, du.resource_name) as resource_name,
# MAGIC coalesce(dl.id, du.id) as `id`,
# MAGIC coalesce(dl.`cost`, du.`cost`) as `cost`,
# MAGIC coalesce(dl.`interval`, du.`interval`) as `interval`,
# MAGIC coalesce(dl.date, du.date) as `date`,
# MAGIC coalesce(dl.timestamp, du.timestamp) as `timestamp`,
# MAGIC du.AVERAGE as DWU_USED_AVERAGE,
# MAGIC du.MAXIMUM as DWU_USED_MAXIMUM,
# MAGIC du.MINIMUM as DWU_USED_MINIMUM,
# MAGIC dl.AVERAGE as DWU_LIMIT_AVERAGE,
# MAGIC dl.MAXIMUM as DWU_LIMIT_MAXIMUM,
# MAGIC dl.MINIMUM as DWU_LIMIT_MINIMUM
# MAGIC from v_df_dwu_limit dl join v_df_dwu_used du
# MAGIC on
# MAGIC dl.id = du.id
# MAGIC and dl.`date` = du.`date`
# MAGIC and dl.`timestamp` = du.`timestamp`
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC select count(*) from v_synapse_usage_final;
 
# COMMAND ----------
 
df_synapse_usage_final = spark.sql("select * from v_synapse_usage_final")
 
# COMMAND ----------
 
from delta.tables import DeltaTable
 
synapse_usage_upsert = DeltaTable.forName(spark, 'platform_dnu.synapse_usage_api')    # Hive metastore-based tables
 
synapse_usage_upsert.alias("t").merge(
    df_synapse_usage_final.alias("s"),
    "s.id = t.id and s.interval = t.interval and s.timestamp = t.timestamp") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()
 
 
# COMMAND ----------
 
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the end
 
# COMMAND ----------
 
# %sql
# select * from platform_dnu.synapse_usage_api_landing where
# `id` = '/subscriptions/3a683d84-be08-4356-bb14-3b62df1bad55/resourcegroups/diageo-analytics-prod-rg-funclib/providers/microsoft.sql/servers/azeunfunclibp001/databases/diageo-analytics-prod-asa-funclib-prod01';

complete code.

Count will not matter, as each session count changes.

issue is, line 52 and 69 should not have same count. when autoloader is showing ingested files.

and if i run line 69 after 1 min, and not immediately, it does shows me updated count.

but i do not want to implement wait(60) login.

Go to the driver logs and check the log4j window. Do you see any streaming query progress made output? Also, go to the "Spark UI" tab and select the "streaming" sub-tab. Do you see any micro-batches completed?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.