2 weeks ago
Iām looking for best practices and guidance on setting up observability for serverless Databricks. Specifically, Iād like to know:
How to capture and monitor system-level metrics (CPU, memory, network, disk) in a serverless setup.
How to configure and collect application metrics (e.g., using Spark listeners, StreamingQueryListener, QueryExecutionListener).
Best way to manage and forward logs (driver logs, executor logs, audit logs, event logs) in a serverless environment.
Recommended approaches for integrating with external monitoring tools like Amazon CloudWatch, Datadog, or SIEM platforms.
Suggestions for building dashboards, alerts, and anomaly detection to ensure end-to-end observability.
If anyone has implemented observability for serverless Databricks workloads, Iād really appreciate insights into the architecture, tools used, and lessons learned.
Thanks in advance for your help!
2 weeks ago
Here are few recommended methods:
import requests
import json
def get_cluster_metrics(workspace_url, token, cluster_id):
headers = {"Authorization": f"Bearer {token}"}
# Get cluster events
events_url = f"{workspace_url}/api/2.0/clusters/events"
events_response = requests.get(
events_url,
headers=headers,
params={"cluster_id": cluster_id}
)
# Get cluster details including resource utilization
details_url = f"{workspace_url}/api/2.0/clusters/get"
details_response = requests.get(
details_url,
headers=headers,
params={"cluster_id": cluster_id}
)
return {
"events": events_response.json(),
"details": details_response.json()
}ā
How to configure and collect application metrics (e.g., using Spark listeners, StreamingQueryListener, QueryExecutionListener).
Spark listeners (e.g., SparkListener, QueryExecutionListener) and StreamingQueryListener work in serverless because theyāre application-level. Register them in your notebook/job and push metrics out (HTTP to a gateway, StatsD, or directly to a lakehouse table). Note that some runtime/version caveats exist by language; check your DBR version.
For structured streaming, attach a StreamingQueryListener to publish: input rows/sec, batch duration, state ops, watermark, last progress. Persist to Delta tables or ship to CloudWatch/Datadog via HTTPS from the driver.
For SQL workloads, use SQL warehouse built-in telemetry plus query history tables to compute p95 latency, scan bytes, error rates (if you also run serverless SQL).
from pyspark.sql.streaming import StreamingQueryListener
import json
import boto3
class CustomStreamingQueryListener(StreamingQueryListener):
def __init__(self, cloudwatch_client):
self.cloudwatch = cloudwatch_client
def onQueryStarted(self, event):
"""Track query start events"""
self.cloudwatch.put_metric_data(
Namespace='Databricks/Streaming',
MetricData=[
{
'MetricName': 'StreamingQueryStarted',
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{'Name': 'QueryId', 'Value': event.id},
{'Name': 'QueryName', 'Value': event.name or 'unnamed'}
]
}
]
)
def onQueryProgress(self, event):
"""Track query progress metrics"""
progress = event.progress
metrics = [
{
'MetricName': 'InputRowsPerSecond',
'Value': progress.inputRowsPerSecond,
'Unit': 'Count/Second'
},
{
'MetricName': 'ProcessedRowsPerSecond',
'Value': progress.processedRowsPerSecond,
'Unit': 'Count/Second'
},
{
'MetricName': 'BatchDuration',
'Value': progress.batchDuration,
'Unit': 'Milliseconds'
}
]
self.cloudwatch.put_metric_data(
Namespace='Databricks/Streaming',
MetricData=metrics
)
def onQueryTerminated(self, event):
"""Track query termination"""
status = "Success" if event.exception is None else "Failed"
self.cloudwatch.put_metric_data(
Namespace='Databricks/Streaming',
MetricData=[
{
'MetricName': 'StreamingQueryCompleted',
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{'Name': 'Status', 'Value': status}
]
}
]
)
# Register the listener
cloudwatch = boto3.client('cloudwatch')
listener = CustomStreamingQueryListener(cloudwatch)
spark.streams.addListener(listener)ā
from pyspark.sql.util import QueryExecutionListener
import time
import logging
class MetricsQueryExecutionListener(QueryExecutionListener):
def __init__(self, metrics_client):
self.metrics_client = metrics_client
self.logger = logging.getLogger(__name__)
def onSuccess(self, funcName, qe, durationNs):
"""Track successful query executions"""
duration_ms = durationNs / 1000000
# Extract query complexity metrics
physical_plan = qe.executedPlan
stages = len([node for node in physical_plan.children])
metrics = {
'query_duration_ms': duration_ms,
'query_stages': stages,
'function_name': funcName,
'success': 1
}
self.metrics_client.send_metrics(metrics)
def onFailure(self, funcName, qe, exception):
"""Track failed query executions"""
metrics = {
'function_name': funcName,
'failure': 1,
'error_type': type(exception).__name__
}
self.metrics_client.send_metrics(metrics)
self.logger.error(f"Query failed: {funcName}, Error: {exception}")
# Register the listener
metrics_listener = MetricsQueryExecutionListener(your_metrics_client)
spark.listenerManager.register(metrics_listener)ā
Best way to manage and forward logs (driver logs, executor logs, audit logs, event logs) in a serverless environment.
Audit logs ā authoritative security/ops timeline
Recommended path: use the audit log system table (system.access.audit) for querying; optionally configure S3 delivery (near-real-time JSON) for downstream tools and SIEMs.
Recommended approaches for integrating with external monitoring tools like Amazon CloudWatch, Datadog, or SIEM platforms.
Suggestions for building dashboards, alerts, and anomaly detection to ensure end-to-end observability.
Reliability
Job failure rate, mean time to recovery ā system tables (jobs) + audit logs.
Streaming freshness (max event age, batch duration) ā StreamingQueryListener + Delta table.
Performance
SQL query latency & scan size ā system tables / SQL telemetry.
Cost
DBUs by job/owner/workflow, $/successful run ā system tables + your billing exports (or Datadog Cloud Cost Management).
Security & governance
Admin actions, permission changes, token/credential events, external location changes ā system.access.audit.
2 weeks ago
@APJESK Serverless is designed to relieve DevOps teams from monitoring these types of metrics. You should be able to track the cost and usage with system tables
2 weeks ago
Databricks deployed on AWS Platform
2 weeks ago
Here are few recommended methods:
import requests
import json
def get_cluster_metrics(workspace_url, token, cluster_id):
headers = {"Authorization": f"Bearer {token}"}
# Get cluster events
events_url = f"{workspace_url}/api/2.0/clusters/events"
events_response = requests.get(
events_url,
headers=headers,
params={"cluster_id": cluster_id}
)
# Get cluster details including resource utilization
details_url = f"{workspace_url}/api/2.0/clusters/get"
details_response = requests.get(
details_url,
headers=headers,
params={"cluster_id": cluster_id}
)
return {
"events": events_response.json(),
"details": details_response.json()
}ā
How to configure and collect application metrics (e.g., using Spark listeners, StreamingQueryListener, QueryExecutionListener).
Spark listeners (e.g., SparkListener, QueryExecutionListener) and StreamingQueryListener work in serverless because theyāre application-level. Register them in your notebook/job and push metrics out (HTTP to a gateway, StatsD, or directly to a lakehouse table). Note that some runtime/version caveats exist by language; check your DBR version.
For structured streaming, attach a StreamingQueryListener to publish: input rows/sec, batch duration, state ops, watermark, last progress. Persist to Delta tables or ship to CloudWatch/Datadog via HTTPS from the driver.
For SQL workloads, use SQL warehouse built-in telemetry plus query history tables to compute p95 latency, scan bytes, error rates (if you also run serverless SQL).
from pyspark.sql.streaming import StreamingQueryListener
import json
import boto3
class CustomStreamingQueryListener(StreamingQueryListener):
def __init__(self, cloudwatch_client):
self.cloudwatch = cloudwatch_client
def onQueryStarted(self, event):
"""Track query start events"""
self.cloudwatch.put_metric_data(
Namespace='Databricks/Streaming',
MetricData=[
{
'MetricName': 'StreamingQueryStarted',
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{'Name': 'QueryId', 'Value': event.id},
{'Name': 'QueryName', 'Value': event.name or 'unnamed'}
]
}
]
)
def onQueryProgress(self, event):
"""Track query progress metrics"""
progress = event.progress
metrics = [
{
'MetricName': 'InputRowsPerSecond',
'Value': progress.inputRowsPerSecond,
'Unit': 'Count/Second'
},
{
'MetricName': 'ProcessedRowsPerSecond',
'Value': progress.processedRowsPerSecond,
'Unit': 'Count/Second'
},
{
'MetricName': 'BatchDuration',
'Value': progress.batchDuration,
'Unit': 'Milliseconds'
}
]
self.cloudwatch.put_metric_data(
Namespace='Databricks/Streaming',
MetricData=metrics
)
def onQueryTerminated(self, event):
"""Track query termination"""
status = "Success" if event.exception is None else "Failed"
self.cloudwatch.put_metric_data(
Namespace='Databricks/Streaming',
MetricData=[
{
'MetricName': 'StreamingQueryCompleted',
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{'Name': 'Status', 'Value': status}
]
}
]
)
# Register the listener
cloudwatch = boto3.client('cloudwatch')
listener = CustomStreamingQueryListener(cloudwatch)
spark.streams.addListener(listener)ā
from pyspark.sql.util import QueryExecutionListener
import time
import logging
class MetricsQueryExecutionListener(QueryExecutionListener):
def __init__(self, metrics_client):
self.metrics_client = metrics_client
self.logger = logging.getLogger(__name__)
def onSuccess(self, funcName, qe, durationNs):
"""Track successful query executions"""
duration_ms = durationNs / 1000000
# Extract query complexity metrics
physical_plan = qe.executedPlan
stages = len([node for node in physical_plan.children])
metrics = {
'query_duration_ms': duration_ms,
'query_stages': stages,
'function_name': funcName,
'success': 1
}
self.metrics_client.send_metrics(metrics)
def onFailure(self, funcName, qe, exception):
"""Track failed query executions"""
metrics = {
'function_name': funcName,
'failure': 1,
'error_type': type(exception).__name__
}
self.metrics_client.send_metrics(metrics)
self.logger.error(f"Query failed: {funcName}, Error: {exception}")
# Register the listener
metrics_listener = MetricsQueryExecutionListener(your_metrics_client)
spark.listenerManager.register(metrics_listener)ā
Best way to manage and forward logs (driver logs, executor logs, audit logs, event logs) in a serverless environment.
Audit logs ā authoritative security/ops timeline
Recommended path: use the audit log system table (system.access.audit) for querying; optionally configure S3 delivery (near-real-time JSON) for downstream tools and SIEMs.
Recommended approaches for integrating with external monitoring tools like Amazon CloudWatch, Datadog, or SIEM platforms.
Suggestions for building dashboards, alerts, and anomaly detection to ensure end-to-end observability.
Reliability
Job failure rate, mean time to recovery ā system tables (jobs) + audit logs.
Streaming freshness (max event age, batch duration) ā StreamingQueryListener + Delta table.
Performance
SQL query latency & scan size ā system tables / SQL telemetry.
Cost
DBUs by job/owner/workflow, $/successful run ā system tables + your billing exports (or Datadog Cloud Cost Management).
Security & governance
Admin actions, permission changes, token/credential events, external location changes ā system.access.audit.
2 weeks ago
Thank you very much, I will go through your solution and get back to you, If I have any doubts.
2 weeks ago
@APJESK Serverless is designed to relieve DevOps teams from monitoring these types of metrics. You should be able to track the cost and usage with system tables
2 weeks ago
Ok, Got it I will start explore it and get back to you.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityāsign up today to get started!
Sign Up Now