cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
Dan_Z
Databricks Employee
Databricks Employee

Introduction

Logging in Azure Data Factory and Databricks Notebooks

Today we are looking at logging for Azure Data Factory (ADF) and Databricks Notebooks. Both of these tools separately have great solutions for logging, but they don't mesh well:

  • ADF does not persist logs indefinitely unless you specifically ask it to by configuring diagnostic logging. You can send it to a storage location, Log Analytics, or Event Hubs. Otherwise you can only see what happened from within ADF studio until the runs expire after 45 days.
  • Databricks Notebooks just run code, so unless you set up compute log delivery at the compute level (not very helpful in this case) which will export the stdout, stderr, and Spark logs, you will need some sort for custom logging solution based off log4j or Python's logging package. 

Solution Requirements

The best solution has the following requirements:

  • Persist all logs to the same location
  • Persist all logs indefinitely
  • Ensure the ADF logs for a given notebook and pipeline can be tied to the notebook logs
  • Simple to set up and maintain

Proposed Solution

The approach laid out in this article is to use ADF's native integration with Azure Log Analytics and then create a custom logging package using Python to send logs from Databricks Notebooks to Azure Log Analytics.

Logging Overview.png

This will enable us to note only keep track of the pipeline and notebook runs, but also link the logs in a given notebook to the corresponding logs for that notebook activity run in ADF.

Custom Logging Package

In order to send logs from Databricks Notebooks to Log Analytics we need client code running on Databricks which meets following requirements:

  • Must run in Databricks notebooks
  • Must be configurable for the Log Analytics instance details:
    • LOGGING_WORKSPACE_ID: the Log Analytics workspace ID
    • LOGGING_WORKSPACE_KEY: the Log Analytics workspace key
    • LOG_TABLE_NAME: the table name logs will be pushed to in Log Analytics
  • Must be configurable within a notebook for the pipeline and activity information so that we can link the notebook logs with the ADF logs for that notebook activity:
    • pipeline run ID
    • pipeline name
    • activity name

Prototype code that achieves this is provided in this Github Repo. Let's talk about the primary class, which is in log_analytics.py:

 

from databricks_loganalytics.base_logging import log_console_output
from pyspark.sql import SparkSession, DataFrame
from databricks.sdk.runtime import *
from time import sleep
import os

global workspace_id
global workspace_key

workspace_id: str = os.environ.get('LOGGING_WORKSPACE_ID')
workspace_key: str = os.environ.get('LOGGING_WORKSPACE_KEY')

spark: SparkSession = SparkSession.builder.getOrCreate()

class notebook_logger:
    def __init__(self, pipeline_run_id: str, pipeline_name: str, activity_name: str):
        self.pipeline_run_id: str = pipeline_run_id
        self.pipeline_name: str = pipeline_name
        self.activity_name: str = activity_name


    def log_info(self, *messages) -> None:
        message: str = " ".join([str(m) for m in messages])
        dbx_body: dict[str, str] = [
            {
                "Message": message,
                "pipelineRunId_g": self.pipeline_run_id,
                "pipeline": self.pipeline_name,
                "activity": self.activity_name,
            }
        ]
        res: str = log_console_output(
            workspace_id=workspace_id,
            workspace_key=workspace_key,
            message=dbx_body,
        )
        print(message)
        if res != 'Accepted':
            print(res)
    

    def log_notebook_output(self, output: dict):

        def log_cmd_output(output):
            if isinstance(output, DataFrame):
                try:
                    out = output.collect()
                    if out:
                        outDict = out[0].asDict()
                        match len(outDict):
                            case 2:
                                operation: str = "INSERT"
                            case 4:
                                operation: str = "MERGE"
                            case 1:
                                operation: str = "DELETE or UPDATE"
                            case _:
                                operation: str = "UKNOWN"
                        if len(out) == 1:
                            message: str = f"{operation}: " + ", ".join([f"{op}: {str(cnt)}" for op, cnt in outDict.items()])
                            self.log_info(message)
                            sleep(0.1)
                except Exception as e:
                    # Handle any exceptions that may occur
                    print(f"WARNING: {output} not logged")
            elif isinstance(output, str):
                self.log_info(output)

        [log_cmd_output(values) for _, values in output.items()]


    def log_inserted_count(self, table):
        message: str = f"INSERTED {spark.read.table(table).count()}"
        self.log_info(message)
    

 

Here we define the class notebook_logger, in which the constructor takes input arguments which can be used to link logs to their respective ADF activity runs. The best way to approach this is to pass in values for these as notebook widgets (ADF parameters), where the activity_name matches the name given to the ADF notebook activity. More on this later.

The three methods provided by notebook_logger offer different ways to send logs to Log Analytics:

  • log_info: passes a string to Log Analytics. Supports multiple arguments like python's print does.
  • log_notebook_output: intended to be run at the bottom of the notebook. if called at the bottom of the notebook like so:
    logger.log_notebook_output(Out)
    where Out is a variable Databricks Notebooks puts ipython stdout, then any ipython stdout will be sent to log_info. This is really intended to send off the results of MERGE, UPDATE, INSERT, and DELETE operations.
  • log_inserted_count: a convenience method to be used after an INSERT.

Set Up

Prerequisites

  • A Log Analytics workspace accessible by ADF and Azure Databricks.
  • The Workspace ID and the Workspace key for this Log Analytics workspace.
  • Permissions to set environment variables on your Databricks cluster.

Steps

  1. Integrate ADF and Log Analytics by configuring diagnostic logging in your ADF instance. You will set the Log Analytics workspace. Make sure to check at least ActivityRuns, PipelineRuns, and TriggerRuns.
  2. Test that is working by running a test pipeline and then executing the following query in Log Analytics:
    AzureDiagnostics
    | where ResourceId contains "<your-adf-resource-id>"​
  3. Add the custom logger to Databricks:
    1. Add the custom logger to your Databricks cluster using the cluster UI.
      1. Put the .whl file here on DBFS
      2. Install the .whl file on your Databricks cluster (reference)
      3. Set environment variables on your Databricks cluster (reference). The LOG_TABLE_NAME can be anything you prefer.
        LOGGING_WORKSPACE_ID="<your-workspace-id>"
        LOGGING_WORKSPACE_KEY="<your-workspace-key>"
        LOG_TABLE_NAME="dbrx_notebook_logs"
    2. (Alternatively) If you don't want to deal with whl files and especially if you are using Git Folders (formerly Databricks Repos), you can add the code in this directory to your Databricks workspace.
    3. Ensure you can import the code by running this Python code in a notebook:
      from databricks_loganalytics.log_analytics import notebook_logger​
    4. To ensure everything works, run this code in a Databricks Notebook:
      from databricks_loganalytics.log_analytics import notebook_logger​
      
      logger = notebook_logger("test", "test", "test")
      logger.log_info("This is a test message")​
    5. If everything is working properly you should be able to find this message in Log Analytics using this Kusto query (we set this from the environment variable above):
      dbrx_notebook_logs​

      If that doesn't work, you can also try "dbrx_notebook_logs_CL​".

Usage

At this point, if you've gotten the tests working you probably have a good idea of what's going on. Let's clear up any uncertainty with a real example. If you are not familiar with passing parameters between ADF and Databricks notebooks, or rusty, you can review the docs here.

ADF Activity Parameters

Let's say you have an ADF pipeline activity that looks like this:

Dan_Z_3-1711642295309.png

You will need to set the following parameters for the Notebook Activity:

  • PIPELINE_RUN_ID: set to the variable @pipeline().RunId (dynamic)
  • PIPELINE_NAMEset to variable @pipeline().Pipeline (dynamic)
  • ACTIVITY_NAME: set to the string "LoadHistoricalDataIntoSmallTables".

Important: ACTIVITY_NAME must match the activity name in the ADF notebook activity, otherwise a query in Log Analytics will not be able to match up the logs between the two systems. 

Databricks Notebook Widgets

Your Databricks notebooks will look something like this:

 

from databricks_loganalytics.log_analytics import notebook_logger

dbutils.widgets.text("PIPELINE_RUN_ID","","PIPELINE_RUN_ID")
dbutils.widgets.text("PIPELINE_NAME","","PIPELINE_NAME")
dbutils.widgets.text("ACTIVITY_NAME","","ACTIVITY_NAME")
PIPELINE_RUN_ID=dbutils.widgets.get("PIPELINE_RUN_ID")
PIPELINE_NAME=dbutils.widgets.get("PIPELINE_NAME")
ACTIVITY_NAME=dbutils.widgets.get("ACTIVITY_NAME")

logger = notebook_logger(PIPELINE_RUN_ID, PIPELINE_NAME, ACTIVITY_NAME)

# <some notebook code>

logger.log_info("some log message")

# <more notebook code>

# last cell
logger.log_notebook_output(Out)

 

Important note: Out is a variable that Databricks Notebooks populates automatically, you will not need to define it and make sure not to overwrite it!

Example Log Analytics Query

Here is a Kusto query you can run in Log Analytics to get all the logs for a given pipeline run and child pipeline activities. Make sure to fill out the ResourceId and a pipeline RunId (e.g. 92d3f580-3cba-4feb-a1c1-c65bddd83b2f) with your own. Also we used dbrx_notebook_logs, but make sure to change it if you set the LOG_TABLE_NAME env variable to something else.

 

// --------------------------------------------------------
// Enter the run id below to get the related and child runs
// --------------------------------------------------------
let runid = "<a pipeline runid>";
// get the child pipeline run IDs
let relatedRunIds = AzureDiagnostics 
| where ResourceId contains "<your ResourceId>"
and Category == "PipelineRuns"
| mv-expand pred = todynamic(Predecessors_s)
| project rootRunId = coalesce(pred.PipelineRunId, runId_g), runId_g
| where rootRunId == runid
| union (print runId_g = runid)
| distinct runId_g;
// Run the main query
AzureDiagnostics 
| where ResourceId contains "<your ResourceId>"
and pipelineRunId_g in (relatedRunIds)
and (status_s  == "Succeeded" or status_s == "Failed") 
| join kind=leftouter dbrx_notebook_logs
on $left.pipelineRunId_g == $right.pipelineRunId_g_g 
    and $left.pipelineName_s == $right.pipeline_s
    and $left.activityName_s == $right.activity_s
| project TimeGenerated,pipelineRunId = coalesce(pipelineRunId_g, groupId_g),
  pipelineName_s, activityName_s, status_s,
  Message = coalesce(Message1, Message)
| where Message != "Accepted"
| order by TimeGenerated asc

 

The above query will:

  1. Find all the child pipeline run IDs associated with the pipeline run ID you passed.
  2. Find all the ADF logs associated with those run IDs.
  3. Join them with the Databricks notebook logs.
  4. Massage them for a clean and ordered output.

Conclusion

Congrats! You've set up logging! This solution is more of a proof-of-concept and the code provided will not be maintained long-term. Feel free to fork the code provided or copy it over to your codebase and add methods that achieve your own ends. I rolled everything into a .whl file for convenience, but plan to get your hands dirty! According to the needs of your organization, expect to customize this custom logging code and make it your own.