cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to log the errors?

karthikmani
New Contributor

We have a notebook with some generic framework that we created to run for multiple tables everyday. We wanted to log the error/success/exceptions any such errors needs to be recorded in a log table so that we can troubleshoot based on the error log for future purpose. How to implement the same? Any sample anyone can share? Thanks.

 

//Karthik

1 ACCEPTED SOLUTION

Accepted Solutions

nayan_wylde
Valued Contributor III

You can basically create some custom functions to log the events and write it to a data lake and then use structured streaming to read the data from data lake to a delta table.

%scala

// Functions
def set_local_variables() = {
 
    // get the variables neededf
    val user_name : String = dbutils.notebook.getContext.tags("user")
    val notebook_path : Option[String] = dbutils.notebook.getContext.notebookPath
    val cluster_name : String = spark.conf.get("spark.databricks.clusterUsageTags.clusterName")
 
    // set values in dataframe
    val columns = Seq("var_name","value")
    val data = Seq(("user_name", user_name), ("notebook_path", notebook_path.get), ("cluster_name", cluster_name))
    val rdd = spark.sparkContext.parallelize(data)
    val df = rdd.toDF("var_name", "value")
 
    // create temp view for local variables
    df.createOrReplaceTempView("local_variables")
 
}

// Run function
set_local_variables()

--------------------------------------------------------
def get_context_variables():
  import json
  user_name = spark.sql('select current_user()').collect()[0][0]
  cluster_name = spark.conf.get("spark.databricks.clusterUsageTags.clusterName")
  notebook_name = spark.sql("select value from local_variables WHERE var_name = 'notebook_path'").collect()[0][0]
  final_dict = {
    'user_name':user_name,
    'cluster_name':cluster_name,
    'notebook_name':notebook_name
  }
  return final_dict


def log_custom_event(event_name, event_properties๐Ÿ˜ž
  from time import sleep
  import json
 
  # set paths
  function_production_datalake_path = "[Data Lake Path]"
 
  try:
    is_enabled = spark.conf.get('core_tools.logging.enabled')
  except:
    is_enabled = 'true'

  if str(is_enabled).lower()=='true':
    try:
      event_properties = json.loads(event_properties)
    except:
      pass

    # import datetime
    from datetime import datetime
    from time import sleep
   
    # get local variables
    context_dict = get_context_variables()
    user_name = context_dict['user_name']
    cluster_name = context_dict['cluster_name']
    notebook_path = context_dict['notebook_name']

    # set notebook path for log
    notebook_path_log = notebook_path.replace('/', '_').replace(' ', '_')

    # add event properties
    event_properties['user_name']=user_name
    event_properties['notebook_path']=notebook_path
    event_properties['cluster_name']=cluster_name

    # set event
    event = {
      'name':event_name,
      'properties':json.dumps(event_properties),
      'as_of': str(datetime.now())
    }
   
    ##########################TODO: build autoloader for this
    # timestamp and file
    timestamp = str(datetime.now()).replace(' ', '').replace(':','').replace('-', '').replace('.', '')
    current_timestamp = datetime.now()
    year = current_timestamp.year
    month = '{:02}'.format(current_timestamp.month)
    day = '{:02}'.format(current_timestamp.day)
    hour = current_timestamp.hour
   
    # try to log the file
    try:      
      try:
        dbutils.fs.put(f'{function_production_datalake_path}/_events/_custom_logging_events/{year}/{month}/{day}/{hour}/{timestamp}-{notebook_path_log}-{user_name}.json', json.dumps(event))
      except:
        sleep(1)
        timestamp = str(datetime.now()).replace(' ', '').replace(':','').replace('-', '')
        dbutils.fs.put(f'{function_production_datalake_path}/_events/_custom_logging_events/{year}/{month}/{day}/{hour}/retry-{timestamp}-{notebook_path_log}-{user_name}.json', json.dumps(event))
    except Exception as e:
      print(str(e))
      print('Skipping autloader logging')

View solution in original post

1 REPLY 1

nayan_wylde
Valued Contributor III

You can basically create some custom functions to log the events and write it to a data lake and then use structured streaming to read the data from data lake to a delta table.

%scala

// Functions
def set_local_variables() = {
 
    // get the variables neededf
    val user_name : String = dbutils.notebook.getContext.tags("user")
    val notebook_path : Option[String] = dbutils.notebook.getContext.notebookPath
    val cluster_name : String = spark.conf.get("spark.databricks.clusterUsageTags.clusterName")
 
    // set values in dataframe
    val columns = Seq("var_name","value")
    val data = Seq(("user_name", user_name), ("notebook_path", notebook_path.get), ("cluster_name", cluster_name))
    val rdd = spark.sparkContext.parallelize(data)
    val df = rdd.toDF("var_name", "value")
 
    // create temp view for local variables
    df.createOrReplaceTempView("local_variables")
 
}

// Run function
set_local_variables()

--------------------------------------------------------
def get_context_variables():
  import json
  user_name = spark.sql('select current_user()').collect()[0][0]
  cluster_name = spark.conf.get("spark.databricks.clusterUsageTags.clusterName")
  notebook_name = spark.sql("select value from local_variables WHERE var_name = 'notebook_path'").collect()[0][0]
  final_dict = {
    'user_name':user_name,
    'cluster_name':cluster_name,
    'notebook_name':notebook_name
  }
  return final_dict


def log_custom_event(event_name, event_properties๐Ÿ˜ž
  from time import sleep
  import json
 
  # set paths
  function_production_datalake_path = "[Data Lake Path]"
 
  try:
    is_enabled = spark.conf.get('core_tools.logging.enabled')
  except:
    is_enabled = 'true'

  if str(is_enabled).lower()=='true':
    try:
      event_properties = json.loads(event_properties)
    except:
      pass

    # import datetime
    from datetime import datetime
    from time import sleep
   
    # get local variables
    context_dict = get_context_variables()
    user_name = context_dict['user_name']
    cluster_name = context_dict['cluster_name']
    notebook_path = context_dict['notebook_name']

    # set notebook path for log
    notebook_path_log = notebook_path.replace('/', '_').replace(' ', '_')

    # add event properties
    event_properties['user_name']=user_name
    event_properties['notebook_path']=notebook_path
    event_properties['cluster_name']=cluster_name

    # set event
    event = {
      'name':event_name,
      'properties':json.dumps(event_properties),
      'as_of': str(datetime.now())
    }
   
    ##########################TODO: build autoloader for this
    # timestamp and file
    timestamp = str(datetime.now()).replace(' ', '').replace(':','').replace('-', '').replace('.', '')
    current_timestamp = datetime.now()
    year = current_timestamp.year
    month = '{:02}'.format(current_timestamp.month)
    day = '{:02}'.format(current_timestamp.day)
    hour = current_timestamp.hour
   
    # try to log the file
    try:      
      try:
        dbutils.fs.put(f'{function_production_datalake_path}/_events/_custom_logging_events/{year}/{month}/{day}/{hour}/{timestamp}-{notebook_path_log}-{user_name}.json', json.dumps(event))
      except:
        sleep(1)
        timestamp = str(datetime.now()).replace(' ', '').replace(':','').replace('-', '')
        dbutils.fs.put(f'{function_production_datalake_path}/_events/_custom_logging_events/{year}/{month}/{day}/{hour}/retry-{timestamp}-{notebook_path_log}-{user_name}.json', json.dumps(event))
    except Exception as e:
      print(str(e))
      print('Skipping autloader logging')

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now