Today we are looking at logging for Azure Data Factory (ADF) and Databricks Notebooks. Both of these tools separately have great solutions for logging, but they don't mesh well:
The best solution has the following requirements:
The approach laid out in this article is to use ADF's native integration with Azure Log Analytics and then create a custom logging package using Python to send logs from Databricks Notebooks to Azure Log Analytics.
This will enable us to note only keep track of the pipeline and notebook runs, but also link the logs in a given notebook to the corresponding logs for that notebook activity run in ADF.
In order to send logs from Databricks Notebooks to Log Analytics we need client code running on Databricks which meets following requirements:
Prototype code that achieves this is provided in this Github Repo. Let's talk about the primary class, which is in log_analytics.py:
from databricks_loganalytics.base_logging import log_console_output
from pyspark.sql import SparkSession, DataFrame
from databricks.sdk.runtime import *
from time import sleep
import os
global workspace_id
global workspace_key
workspace_id: str = os.environ.get('LOGGING_WORKSPACE_ID')
workspace_key: str = os.environ.get('LOGGING_WORKSPACE_KEY')
spark: SparkSession = SparkSession.builder.getOrCreate()
class notebook_logger:
def __init__(self, pipeline_run_id: str, pipeline_name: str, activity_name: str):
self.pipeline_run_id: str = pipeline_run_id
self.pipeline_name: str = pipeline_name
self.activity_name: str = activity_name
def log_info(self, *messages) -> None:
message: str = " ".join([str(m) for m in messages])
dbx_body: dict[str, str] = [
{
"Message": message,
"pipelineRunId_g": self.pipeline_run_id,
"pipeline": self.pipeline_name,
"activity": self.activity_name,
}
]
res: str = log_console_output(
workspace_id=workspace_id,
workspace_key=workspace_key,
message=dbx_body,
)
print(message)
if res != 'Accepted':
print(res)
def log_notebook_output(self, output: dict):
def log_cmd_output(output):
if isinstance(output, DataFrame):
try:
out = output.collect()
if out:
outDict = out[0].asDict()
match len(outDict):
case 2:
operation: str = "INSERT"
case 4:
operation: str = "MERGE"
case 1:
operation: str = "DELETE or UPDATE"
case _:
operation: str = "UKNOWN"
if len(out) == 1:
message: str = f"{operation}: " + ", ".join([f"{op}: {str(cnt)}" for op, cnt in outDict.items()])
self.log_info(message)
sleep(0.1)
except Exception as e:
# Handle any exceptions that may occur
print(f"WARNING: {output} not logged")
elif isinstance(output, str):
self.log_info(output)
[log_cmd_output(values) for _, values in output.items()]
def log_inserted_count(self, table):
message: str = f"INSERTED {spark.read.table(table).count()}"
self.log_info(message)
Here we define the class notebook_logger, in which the constructor takes input arguments which can be used to link logs to their respective ADF activity runs. The best way to approach this is to pass in values for these as notebook widgets (ADF parameters), where the activity_name matches the name given to the ADF notebook activity. More on this later.
The three methods provided by notebook_logger offer different ways to send logs to Log Analytics:
logger.log_notebook_output(Out)
where Out is a variable Databricks Notebooks puts ipython stdout, then any ipython stdout will be sent to log_info. This is really intended to send off the results of MERGE, UPDATE, INSERT, and DELETE operations.AzureDiagnostics
| where ResourceId contains "<your-adf-resource-id>"
LOGGING_WORKSPACE_ID="<your-workspace-id>"
LOGGING_WORKSPACE_KEY="<your-workspace-key>"
LOG_TABLE_NAME="dbrx_notebook_logs"
from databricks_loganalytics.log_analytics import notebook_logger
from databricks_loganalytics.log_analytics import notebook_logger
logger = notebook_logger("test", "test", "test")
logger.log_info("This is a test message")
dbrx_notebook_logs
If that doesn't work, you can also try "dbrx_notebook_logs_CL".
At this point, if you've gotten the tests working you probably have a good idea of what's going on. Let's clear up any uncertainty with a real example. If you are not familiar with passing parameters between ADF and Databricks notebooks, or rusty, you can review the docs here.
Let's say you have an ADF pipeline activity that looks like this:
You will need to set the following parameters for the Notebook Activity:
Important: ACTIVITY_NAME must match the activity name in the ADF notebook activity, otherwise a query in Log Analytics will not be able to match up the logs between the two systems.
Your Databricks notebooks will look something like this:
from databricks_loganalytics.log_analytics import notebook_logger
dbutils.widgets.text("PIPELINE_RUN_ID","","PIPELINE_RUN_ID")
dbutils.widgets.text("PIPELINE_NAME","","PIPELINE_NAME")
dbutils.widgets.text("ACTIVITY_NAME","","ACTIVITY_NAME")
PIPELINE_RUN_ID=dbutils.widgets.get("PIPELINE_RUN_ID")
PIPELINE_NAME=dbutils.widgets.get("PIPELINE_NAME")
ACTIVITY_NAME=dbutils.widgets.get("ACTIVITY_NAME")
logger = notebook_logger(PIPELINE_RUN_ID, PIPELINE_NAME, ACTIVITY_NAME)
# <some notebook code>
logger.log_info("some log message")
# <more notebook code>
# last cell
logger.log_notebook_output(Out)
Important note: Out is a variable that Databricks Notebooks populates automatically, you will not need to define it and make sure not to overwrite it!
Here is a Kusto query you can run in Log Analytics to get all the logs for a given pipeline run and child pipeline activities. Make sure to fill out the ResourceId and a pipeline RunId (e.g. 92d3f580-3cba-4feb-a1c1-c65bddd83b2f) with your own. Also we used dbrx_notebook_logs, but make sure to change it if you set the LOG_TABLE_NAME env variable to something else.
// --------------------------------------------------------
// Enter the run id below to get the related and child runs
// --------------------------------------------------------
let runid = "<a pipeline runid>";
// get the child pipeline run IDs
let relatedRunIds = AzureDiagnostics
| where ResourceId contains "<your ResourceId>"
and Category == "PipelineRuns"
| mv-expand pred = todynamic(Predecessors_s)
| project rootRunId = coalesce(pred.PipelineRunId, runId_g), runId_g
| where rootRunId == runid
| union (print runId_g = runid)
| distinct runId_g;
// Run the main query
AzureDiagnostics
| where ResourceId contains "<your ResourceId>"
and pipelineRunId_g in (relatedRunIds)
and (status_s == "Succeeded" or status_s == "Failed")
| join kind=leftouter dbrx_notebook_logs
on $left.pipelineRunId_g == $right.pipelineRunId_g_g
and $left.pipelineName_s == $right.pipeline_s
and $left.activityName_s == $right.activity_s
| project TimeGenerated,pipelineRunId = coalesce(pipelineRunId_g, groupId_g),
pipelineName_s, activityName_s, status_s,
Message = coalesce(Message1, Message)
| where Message != "Accepted"
| order by TimeGenerated asc
The above query will:
Congrats! You've set up logging! This solution is more of a proof-of-concept and the code provided will not be maintained long-term. Feel free to fork the code provided or copy it over to your codebase and add methods that achieve your own ends. I rolled everything into a .whl file for convenience, but plan to get your hands dirty! According to the needs of your organization, expect to customize this custom logging code and make it your own.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.