12-03-2022 05:27 AM
I have a simple autoloader job which looks like this
df_dwu_limit = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "JSON") \
.schema(schemaFromJson) \
.load("abfss://synapse-usage@xxxxx.dfs.core.windows.net/synapse-usage/")\
.writeStream \
.format("delta")\
.option("checkpointLocation", "abfss://adb-delta-synapse-usage-api@xxxxxx.dfs.core.windows.net/checkpoint_synapse_usage_api_landing/") \
.trigger(availableNow=True)\
.toTable("platform_dnu.synapse_usage_api_landing")
very next line i am using delta table "" to further processing.
i also have count(*) query before and after auto loader. count doesn't change. Although i can see in auto loader profile, record has been written.
If I wait for 1 min or so, and run count(*) query, i can see updated record. how to solve this issue?
o/p of auto loader for one particular session
{
"id" : "cb9a28b4-c5b4-4865-bc65-b3ca5efd2537",
"runId" : "64c2afd9-ad69-4e9a-97bf-d6fa2794931a",
"name" : null,
"timestamp" : "2022-12-03T04:44:17.591Z",
"batchId" : 7,
"numInputRows" : 27,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.7879760688749453,
"durationMs" : {
"addBatch" : 3005,
"commitOffsets" : 146,
"getBatch" : 12,
"latestOffset" : 30380,
"queryPlanning" : 61,
"triggerExecution" : 34259,
"walCommit" : 222
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "CloudFilesSource[abfss://synapse-usage@platformbilling.dfs.core.windows.net/synapse-usage/]",
"startOffset" : {
"seqNum" : 2534,
"sourceVersion" : 1,
"lastBackfillStartTimeMs" : 1669823987701,
"lastBackfillFinishTimeMs" : 1669823991340
},
"endOffset" : {
"seqNum" : 2562,
"sourceVersion" : 1,
"lastBackfillStartTimeMs" : 1669823987701,
"lastBackfillFinishTimeMs" : 1669823991340
},
"latestOffset" : null,
"numInputRows" : 27,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.7879760688749453,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[abfss://adb-delta-synapse-usage-api@platbillingdatabricks.dfs.core.windows.net/delta/synapse_usage_api_landing]",
"numOutputRows" : -1
}
}
DDL for Delta Table. (img attached)
12-03-2022 08:08 AM
Can you share the whole code with the counts, which you mentioned?
12-03-2022 05:08 PM
# Databricks notebook source
from pyspark.sql.functions import col, explode, upper, lower, lit, floor, coalesce, to_date, date_format, to_timestamp, lit, cast
from pyspark.sql.types import StructType
import json
from azure.storage.blob import BlobServiceClient, ContentSettings
from datetime import datetime
# COMMAND ----------
spark.conf.set("fs.azure.account.auth.type.XXXXXX.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.XXXXXX.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.XXXXXX.dfs.core.windows.net", "")
spark.conf.set("fs.azure.account.auth.type.YYYYYYY.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.YYYYYYY.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.YYYYYYY.dfs.core.windows.net", "")
schema_string = "{\"fields\":[{\"metadata\":{},\"name\":\"DWU_LIMIT\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"DWU_USED\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"cost\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"interval\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"resource_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"rg_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"sub_id\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}"
schemaFromJson = StructType.fromJson(json.loads(schema_string))
# COMMAND ----------
# MAGIC %sql
# MAGIC DROP VIEW IF EXISTS v_synapse_usage_final;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_limit;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_used;
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the start
# COMMAND ----------
# ##print schema
# files_to_process = 'abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/'
# df_json_from_adls = spark.read\
# .option("recursiveFileLookup", "True")\
# .json(files_to_process)
# print(df_json_from_adls.schema.json())
# # #df_json_from_adls.write.saveAsTable('dnu_synpase_tmp')
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the start
# COMMAND ----------
df_dwu_limit = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "JSON") \
.schema(schemaFromJson) \
.load("abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/")\
.writeStream \
.format("delta")\
.option("checkpointLocation", "abfss://adb-delta-synapse-usage-api@YYYYYYY.dfs.core.windows.net/checkpoint_synapse_usage_api_landing/") \
.trigger(availableNow=True)\
.toTable("platform_dnu.synapse_usage_api_landing")
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the end
# COMMAND ----------
df_dwu_limit_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_LIMIT FROM platform_dnu.synapse_usage_api_landing")
df_dwu_used_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_USED FROM platform_dnu.synapse_usage_api_landing")
# COMMAND ----------
df_dwu_limit = df_dwu_limit_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_LIMIT' type", "explode(DWU_LIMIT) DWU_LIMIT")\
.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_LIMIT['average'] as AVERAGE", "DWU_LIMIT['maximum'] as MAXIMUM", "DWU_LIMIT['minimum'] as MINIMUM", "to_date(DWU_LIMIT['timestamp']) as date", "to_timestamp(DWU_LIMIT['timestamp']) as timestamp")
df_dwu_used = df_dwu_used_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_USED' type", "explode(DWU_USED) DWU_USED")\
.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_USED['average'] as AVERAGE", "DWU_USED['maximum'] as MAXIMUM", "DWU_USED['minimum'] as MINIMUM", "to_date(DWU_USED['timestamp']) as date", "to_timestamp(DWU_USED['timestamp']) as timestamp")
# COMMAND ----------
df_dwu_limit.createOrReplaceTempView("v_df_dwu_limit")
df_dwu_used.createOrReplaceTempView("v_df_dwu_used")
# COMMAND ----------
# MAGIC %sql
# MAGIC
# MAGIC CREATE OR REPLACE TEMPORARY VIEW v_synapse_usage_final
# MAGIC as
# MAGIC select
# MAGIC coalesce(dl.sub_id, du.sub_id) as sub_id,
# MAGIC coalesce(dl.rg_name, du.rg_name) as rg_name,
# MAGIC coalesce(dl.resource_name, du.resource_name) as resource_name,
# MAGIC coalesce(dl.id, du.id) as `id`,
# MAGIC coalesce(dl.`cost`, du.`cost`) as `cost`,
# MAGIC coalesce(dl.`interval`, du.`interval`) as `interval`,
# MAGIC coalesce(dl.date, du.date) as `date`,
# MAGIC coalesce(dl.timestamp, du.timestamp) as `timestamp`,
# MAGIC du.AVERAGE as DWU_USED_AVERAGE,
# MAGIC du.MAXIMUM as DWU_USED_MAXIMUM,
# MAGIC du.MINIMUM as DWU_USED_MINIMUM,
# MAGIC dl.AVERAGE as DWU_LIMIT_AVERAGE,
# MAGIC dl.MAXIMUM as DWU_LIMIT_MAXIMUM,
# MAGIC dl.MINIMUM as DWU_LIMIT_MINIMUM
# MAGIC from v_df_dwu_limit dl join v_df_dwu_used du
# MAGIC on
# MAGIC dl.id = du.id
# MAGIC and dl.`date` = du.`date`
# MAGIC and dl.`timestamp` = du.`timestamp`
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from v_synapse_usage_final;
# COMMAND ----------
df_synapse_usage_final = spark.sql("select * from v_synapse_usage_final")
# COMMAND ----------
from delta.tables import DeltaTable
synapse_usage_upsert = DeltaTable.forName(spark, 'platform_dnu.synapse_usage_api') # Hive metastore-based tables
synapse_usage_upsert.alias("t").merge(
df_synapse_usage_final.alias("s"),
"s.id = t.id and s.interval = t.interval and s.timestamp = t.timestamp") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the end
# COMMAND ----------
# %sql
# select * from platform_dnu.synapse_usage_api_landing where
# `id` = '/subscriptions/3a683d84-be08-4356-bb14-3b62df1bad55/resourcegroups/diageo-analytics-prod-rg-funclib/providers/microsoft.sql/servers/azeunfunclibp001/databases/diageo-analytics-prod-asa-funclib-prod01';
complete code.
Count will not matter, as each session count changes.
issue is, line 52 and 69 should not have same count. when autoloader is showing ingested files.
and if i run line 69 after 1 min, and not immediately, it does shows me updated count.
but i do not want to implement wait(60) login.
01-25-2023 03:42 PM
Go to the driver logs and check the log4j window. Do you see any streaming query progress made output? Also, go to the "Spark UI" tab and select the "streaming" sub-tab. Do you see any micro-batches completed?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group