12-03-2022 05:27 AM
I have a simple autoloader job which looks like this
df_dwu_limit = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "JSON") \
.schema(schemaFromJson) \
.load("abfss://synapse-usage@xxxxx.dfs.core.windows.net/synapse-usage/")\
.writeStream \
.format("delta")\
.option("checkpointLocation", "abfss://adb-delta-synapse-usage-api@xxxxxx.dfs.core.windows.net/checkpoint_synapse_usage_api_landing/") \
.trigger(availableNow=True)\
.toTable("platform_dnu.synapse_usage_api_landing")
very next line i am using delta table "" to further processing.
i also have count(*) query before and after auto loader. count doesn't change. Although i can see in auto loader profile, record has been written.
If I wait for 1 min or so, and run count(*) query, i can see updated record. how to solve this issue?
o/p of auto loader for one particular session
{
"id" : "cb9a28b4-c5b4-4865-bc65-b3ca5efd2537",
"runId" : "64c2afd9-ad69-4e9a-97bf-d6fa2794931a",
"name" : null,
"timestamp" : "2022-12-03T04:44:17.591Z",
"batchId" : 7,
"numInputRows" : 27,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.7879760688749453,
"durationMs" : {
"addBatch" : 3005,
"commitOffsets" : 146,
"getBatch" : 12,
"latestOffset" : 30380,
"queryPlanning" : 61,
"triggerExecution" : 34259,
"walCommit" : 222
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "CloudFilesSource[abfss://synapse-usage@platformbilling.dfs.core.windows.net/synapse-usage/]",
"startOffset" : {
"seqNum" : 2534,
"sourceVersion" : 1,
"lastBackfillStartTimeMs" : 1669823987701,
"lastBackfillFinishTimeMs" : 1669823991340
},
"endOffset" : {
"seqNum" : 2562,
"sourceVersion" : 1,
"lastBackfillStartTimeMs" : 1669823987701,
"lastBackfillFinishTimeMs" : 1669823991340
},
"latestOffset" : null,
"numInputRows" : 27,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.7879760688749453,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[abfss://adb-delta-synapse-usage-api@platbillingdatabricks.dfs.core.windows.net/delta/synapse_usage_api_landing]",
"numOutputRows" : -1
}
}
DDL for Delta Table. (img attached)
12-03-2022 08:08 AM
Can you share the whole code with the counts, which you mentioned?
12-03-2022 05:08 PM
# Databricks notebook source
from pyspark.sql.functions import col, explode, upper, lower, lit, floor, coalesce, to_date, date_format, to_timestamp, lit, cast
from pyspark.sql.types import StructType
import json
from azure.storage.blob import BlobServiceClient, ContentSettings
from datetime import datetime
# COMMAND ----------
spark.conf.set("fs.azure.account.auth.type.XXXXXX.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.XXXXXX.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.XXXXXX.dfs.core.windows.net", "")
spark.conf.set("fs.azure.account.auth.type.YYYYYYY.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.YYYYYYY.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.YYYYYYY.dfs.core.windows.net", "")
schema_string = "{\"fields\":[{\"metadata\":{},\"name\":\"DWU_LIMIT\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"DWU_USED\",\"nullable\":true,\"type\":{\"containsNull\":true,\"elementType\":{\"fields\":[{\"metadata\":{},\"name\":\"average\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"maximum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"minimum\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"timeStamp\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"},\"type\":\"array\"}},{\"metadata\":{},\"name\":\"cost\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"interval\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"resource_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"rg_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"sub_id\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}"
schemaFromJson = StructType.fromJson(json.loads(schema_string))
# COMMAND ----------
# MAGIC %sql
# MAGIC DROP VIEW IF EXISTS v_synapse_usage_final;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_limit;
# MAGIC DROP VIEW IF EXISTS v_df_dwu_used;
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the start
# COMMAND ----------
# ##print schema
# files_to_process = 'abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/'
# df_json_from_adls = spark.read\
# .option("recursiveFileLookup", "True")\
# .json(files_to_process)
# print(df_json_from_adls.schema.json())
# # #df_json_from_adls.write.saveAsTable('dnu_synpase_tmp')
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the start
# COMMAND ----------
df_dwu_limit = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "JSON") \
.schema(schemaFromJson) \
.load("abfss://synapse-usage@XXXXXX.dfs.core.windows.net/synapse-usage/")\
.writeStream \
.format("delta")\
.option("checkpointLocation", "abfss://adb-delta-synapse-usage-api@YYYYYYY.dfs.core.windows.net/checkpoint_synapse_usage_api_landing/") \
.trigger(availableNow=True)\
.toTable("platform_dnu.synapse_usage_api_landing")
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api_landing; -- record count at the end
# COMMAND ----------
df_dwu_limit_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_LIMIT FROM platform_dnu.synapse_usage_api_landing")
df_dwu_used_base = spark.sql("SELECT sub_id, rg_name, resource_name, cost, `interval`, id, DWU_USED FROM platform_dnu.synapse_usage_api_landing")
# COMMAND ----------
df_dwu_limit = df_dwu_limit_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_LIMIT' type", "explode(DWU_LIMIT) DWU_LIMIT")\
.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_LIMIT['average'] as AVERAGE", "DWU_LIMIT['maximum'] as MAXIMUM", "DWU_LIMIT['minimum'] as MINIMUM", "to_date(DWU_LIMIT['timestamp']) as date", "to_timestamp(DWU_LIMIT['timestamp']) as timestamp")
df_dwu_used = df_dwu_used_base.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "'DWU_USED' type", "explode(DWU_USED) DWU_USED")\
.selectExpr("sub_id", "rg_name", "resource_name", "cost", "id", "`interval`", "type", "DWU_USED['average'] as AVERAGE", "DWU_USED['maximum'] as MAXIMUM", "DWU_USED['minimum'] as MINIMUM", "to_date(DWU_USED['timestamp']) as date", "to_timestamp(DWU_USED['timestamp']) as timestamp")
# COMMAND ----------
df_dwu_limit.createOrReplaceTempView("v_df_dwu_limit")
df_dwu_used.createOrReplaceTempView("v_df_dwu_used")
# COMMAND ----------
# MAGIC %sql
# MAGIC
# MAGIC CREATE OR REPLACE TEMPORARY VIEW v_synapse_usage_final
# MAGIC as
# MAGIC select
# MAGIC coalesce(dl.sub_id, du.sub_id) as sub_id,
# MAGIC coalesce(dl.rg_name, du.rg_name) as rg_name,
# MAGIC coalesce(dl.resource_name, du.resource_name) as resource_name,
# MAGIC coalesce(dl.id, du.id) as `id`,
# MAGIC coalesce(dl.`cost`, du.`cost`) as `cost`,
# MAGIC coalesce(dl.`interval`, du.`interval`) as `interval`,
# MAGIC coalesce(dl.date, du.date) as `date`,
# MAGIC coalesce(dl.timestamp, du.timestamp) as `timestamp`,
# MAGIC du.AVERAGE as DWU_USED_AVERAGE,
# MAGIC du.MAXIMUM as DWU_USED_MAXIMUM,
# MAGIC du.MINIMUM as DWU_USED_MINIMUM,
# MAGIC dl.AVERAGE as DWU_LIMIT_AVERAGE,
# MAGIC dl.MAXIMUM as DWU_LIMIT_MAXIMUM,
# MAGIC dl.MINIMUM as DWU_LIMIT_MINIMUM
# MAGIC from v_df_dwu_limit dl join v_df_dwu_used du
# MAGIC on
# MAGIC dl.id = du.id
# MAGIC and dl.`date` = du.`date`
# MAGIC and dl.`timestamp` = du.`timestamp`
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from v_synapse_usage_final;
# COMMAND ----------
df_synapse_usage_final = spark.sql("select * from v_synapse_usage_final")
# COMMAND ----------
from delta.tables import DeltaTable
synapse_usage_upsert = DeltaTable.forName(spark, 'platform_dnu.synapse_usage_api') # Hive metastore-based tables
synapse_usage_upsert.alias("t").merge(
df_synapse_usage_final.alias("s"),
"s.id = t.id and s.interval = t.interval and s.timestamp = t.timestamp") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# COMMAND ----------
# MAGIC %sql
# MAGIC select count(*) from platform_dnu.synapse_usage_api; -- record count at the end
# COMMAND ----------
# %sql
# select * from platform_dnu.synapse_usage_api_landing where
# `id` = '/subscriptions/3a683d84-be08-4356-bb14-3b62df1bad55/resourcegroups/diageo-analytics-prod-rg-funclib/providers/microsoft.sql/servers/azeunfunclibp001/databases/diageo-analytics-prod-asa-funclib-prod01';
complete code.
Count will not matter, as each session count changes.
issue is, line 52 and 69 should not have same count. when autoloader is showing ingested files.
and if i run line 69 after 1 min, and not immediately, it does shows me updated count.
but i do not want to implement wait(60) login.
01-25-2023 03:42 PM
Go to the driver logs and check the log4j window. Do you see any streaming query progress made output? Also, go to the "Spark UI" tab and select the "streaming" sub-tab. Do you see any micro-batches completed?
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now