You can basically create some custom functions to log the events and write it to a data lake and then use structured streaming to read the data from data lake to a delta table.
%scala
// Functions
def set_local_variables() = {
// get the variables neededf
val user_name : String = dbutils.notebook.getContext.tags("user")
val notebook_path : Option[String] = dbutils.notebook.getContext.notebookPath
val cluster_name : String = spark.conf.get("spark.databricks.clusterUsageTags.clusterName")
// set values in dataframe
val columns = Seq("var_name","value")
val data = Seq(("user_name", user_name), ("notebook_path", notebook_path.get), ("cluster_name", cluster_name))
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF("var_name", "value")
// create temp view for local variables
df.createOrReplaceTempView("local_variables")
}
// Run function
set_local_variables()
--------------------------------------------------------
def get_context_variables():
import json
user_name = spark.sql('select current_user()').collect()[0][0]
cluster_name = spark.conf.get("spark.databricks.clusterUsageTags.clusterName")
notebook_name = spark.sql("select value from local_variables WHERE var_name = 'notebook_path'").collect()[0][0]
final_dict = {
'user_name':user_name,
'cluster_name':cluster_name,
'notebook_name':notebook_name
}
return final_dict
def log_custom_event(event_name, event_properties๐
from time import sleep
import json
# set paths
function_production_datalake_path = "[Data Lake Path]"
try:
is_enabled = spark.conf.get('core_tools.logging.enabled')
except:
is_enabled = 'true'
if str(is_enabled).lower()=='true':
try:
event_properties = json.loads(event_properties)
except:
pass
# import datetime
from datetime import datetime
from time import sleep
# get local variables
context_dict = get_context_variables()
user_name = context_dict['user_name']
cluster_name = context_dict['cluster_name']
notebook_path = context_dict['notebook_name']
# set notebook path for log
notebook_path_log = notebook_path.replace('/', '_').replace(' ', '_')
# add event properties
event_properties['user_name']=user_name
event_properties['notebook_path']=notebook_path
event_properties['cluster_name']=cluster_name
# set event
event = {
'name':event_name,
'properties':json.dumps(event_properties),
'as_of': str(datetime.now())
}
##########################TODO: build autoloader for this
# timestamp and file
timestamp = str(datetime.now()).replace(' ', '').replace(':','').replace('-', '').replace('.', '')
current_timestamp = datetime.now()
year = current_timestamp.year
month = '{:02}'.format(current_timestamp.month)
day = '{:02}'.format(current_timestamp.day)
hour = current_timestamp.hour
# try to log the file
try:
try:
dbutils.fs.put(f'{function_production_datalake_path}/_events/_custom_logging_events/{year}/{month}/{day}/{hour}/{timestamp}-{notebook_path_log}-{user_name}.json', json.dumps(event))
except:
sleep(1)
timestamp = str(datetime.now()).replace(' ', '').replace(':','').replace('-', '')
dbutils.fs.put(f'{function_production_datalake_path}/_events/_custom_logging_events/{year}/{month}/{day}/{hour}/retry-{timestamp}-{notebook_path_log}-{user_name}.json', json.dumps(event))
except Exception as e:
print(str(e))
print('Skipping autloader logging')